You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shinhyung Yang <sh...@gmail.com> on 2016/03/31 16:50:59 UTC

scaling a flink streaming application on a single node

Dear flink users and developers,

I am trying to test scaling a flink streaming application on a single
node and here I summarize my configuration and preliminary result. It
would be really helpful if you take some time and consult my settings.

test application: flink-1.0.0/examples/streaming/WordCount.jar
input file: enwiki-20160305-pages-articles-multistream-index.txt
(747,757,155 bytes)
(https://dumps.wikimedia.org/enwiki/20160305/enwiki-20160305-pages-articles-multistream-index.txt.bz2)

Running environment is as follows:

cpu: (4 * AMD Opteron 6378 (16 cores per each)) 2.4GHz
memory: 120 GB
os: CentOS 7.2
vm: Java 8u74
flink: flink-1.0.0

My flink configuration is here (modified ones):

jobmanager.heap.mb: 1024
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 64
taskmanager.network.numberOfBuffers: 8192

Keeping the configuration above the same all the way through my test,
I only changed parallelism.default for each of the test cases.

parallelism.default: 1
parallelism.default: 2
parallelism.default: 4
parallelism.default: 16
parallelism.default: 32
parallelism.default: 64

And I put the result at the end.

It seems to scale well until the case of parallelism 8 and usually,
``Source -> Flat Map'' scales better than ``Keyed Aggregation ->
Sink''. The result from parallelized subtasks of ``Keyed Aggregation
-> Sink'' seem more consistent than the subtasks of ``Source -> Flat
Map''.

Do you see anything that I might need to / have to fix in the flink
configuration or jvm configuration(I did not touch this for this
experiment) to improve the performance? Especially the result I got
with parallelism of 64 does not look good to me. I would also really
appreciate if there is something you want to suggest that might be
worth trying.

Thank you.
With best regards,
Shinhyung Yang

#==============================================================================
# parallelism.default: 1
#==============================================================================
Source: Read Text File Source -> Flat Map(1/1): 34m 57s
Keyed Aggregation -> Sink: Unnamed(1/1): 47m 05s

#==============================================================================
# parallelism.default: 2
#==============================================================================
Source: Read Text File Source -> Flat Map(1/2): 25m 56s
Source: Read Text File Source -> Flat Map(2/2): 26m 27s
Keyed Aggregation -> Sink: Unnamed(1/2):  34m 09s
Keyed Aggregation -> Sink: Unnamed(2/2):  34m 08s

#==============================================================================
# parallelism.default: 4
#==============================================================================
Source: Read Text File Source -> Flat Map(2/4): 21m 39s
Source: Read Text File Source -> Flat Map(1/4): 21m 39s
Source: Read Text File Source -> Flat Map(4/4): 22m 30s
Source: Read Text File Source -> Flat Map(3/4): 22m 31s
Keyed Aggregation -> Sink: Unnamed(3/4): 23m 20s
Keyed Aggregation -> Sink: Unnamed(1/4): 24m 13s
Keyed Aggregation -> Sink: Unnamed(4/4): 24m 57s
Keyed Aggregation -> Sink: Unnamed(2/4): 25m 35s

#==============================================================================
# parallelism.default: 8
#==============================================================================
Source: Read Text File Source -> Flat Map(1/8): 21m 06s
Source: Read Text File Source -> Flat Map(7/8): 21m 32s
Source: Read Text File Source -> Flat Map(2/8): 21m 45s
Source: Read Text File Source -> Flat Map(6/8): 21m 45s
Source: Read Text File Source -> Flat Map(4/8): 21m 45s
Source: Read Text File Source -> Flat Map(3/8): 21m 45s
Source: Read Text File Source -> Flat Map(8/8): 21m 55s
Source: Read Text File Source -> Flat Map(5/8): 21m 55s
Keyed Aggregation -> Sink: Unnamed(4/8):  22m 08s
Keyed Aggregation -> Sink: Unnamed(3/8):  22m 28s
Keyed Aggregation -> Sink: Unnamed(7/8):  22m 58s
Keyed Aggregation -> Sink: Unnamed(6/8):  22m 59s
Keyed Aggregation -> Sink: Unnamed(5/8):  22m 59s
Keyed Aggregation -> Sink: Unnamed(2/8):  23m 00s
Keyed Aggregation -> Sink: Unnamed(1/8):  23m 47s
Keyed Aggregation -> Sink: Unnamed(8/8):  23m 51s

#==============================================================================
# parallelism.default: 16
#==============================================================================
Source; Read Text File Source -> Flat Map(14/16); 08m 57s
Source; Read Text File Source -> Flat Map(5/16); 13m 00s
Source; Read Text File Source -> Flat Map(10/16); 16m 49s
Source; Read Text File Source -> Flat Map(15/16); 16m 49s
Source; Read Text File Source -> Flat Map(6/16); 17m 54s
Source; Read Text File Source -> Flat Map(2/16); 18m 40s
Source; Read Text File Source -> Flat Map(4/16); 19m 48s
Source; Read Text File Source -> Flat Map(8/16); 19m 48s
Source; Read Text File Source -> Flat Map(16/16); 20m 49s
Source; Read Text File Source -> Flat Map(9/16); 21m 06s
Source; Read Text File Source -> Flat Map(12/16); 21m 41s
Source; Read Text File Source -> Flat Map(3/16); 22m 08s
Source; Read Text File Source -> Flat Map(11/16); 22m 08s
Source; Read Text File Source -> Flat Map(1/16); 24m 13s
Source; Read Text File Source -> Flat Map(13/16); 24m 13s
Source; Read Text File Source -> Flat Map(7/16); 24m 14s
Keyed Aggregation -> Sink; Unnamed(2/16); 24m 23s
Keyed Aggregation -> Sink; Unnamed(12/16); 24m 23s
Keyed Aggregation -> Sink; Unnamed(6/16); 24m 23s
Keyed Aggregation -> Sink; Unnamed(8/16); 24m 23s
Keyed Aggregation -> Sink; Unnamed(14/16); 24m 23s
Keyed Aggregation -> Sink; Unnamed(3/16); 24m 23s
Keyed Aggregation -> Sink; Unnamed(15/16); 24m 23s
Keyed Aggregation -> Sink; Unnamed(4/16); 24m 23s
Keyed Aggregation -> Sink; Unnamed(9/16); 24m 23s
Keyed Aggregation -> Sink; Unnamed(7/16); 24m 24s
Keyed Aggregation -> Sink; Unnamed(11/16); 24m 24s
Keyed Aggregation -> Sink; Unnamed(13/16); 24m 24s
Keyed Aggregation -> Sink; Unnamed(1/16); 25m 07s
Keyed Aggregation -> Sink; Unnamed(5/16); 25m 08s
Keyed Aggregation -> Sink; Unnamed(10/16); 25m 19s
Keyed Aggregation -> Sink; Unnamed(16/16); 25m 21s

#==============================================================================
# parallelism.default: 32
#==============================================================================
Source: Read Text File Source -> Flat Map(14/32): 05m 25s
Source: Read Text File Source -> Flat Map(2/32): 05m 41s
Source: Read Text File Source -> Flat Map(26/32): 07m 24s
Source: Read Text File Source -> Flat Map(15/32): 09m 35s
Source: Read Text File Source -> Flat Map(9/32): 10m 23s
Source: Read Text File Source -> Flat Map(11/32): 10m 40s
Source: Read Text File Source -> Flat Map(31/32): 10m 40s
Source: Read Text File Source -> Flat Map(27/32): 10m 41s
Source: Read Text File Source -> Flat Map(20/32): 13m 25s
Source: Read Text File Source -> Flat Map(29/32): 15m 02s
Source: Read Text File Source -> Flat Map(5/32): 15m 43s
Source: Read Text File Source -> Flat Map(16/32): 16m 00s
Source: Read Text File Source -> Flat Map(21/32): 16m 18s
Source: Read Text File Source -> Flat Map(6/32): 17m 28s
Source: Read Text File Source -> Flat Map(10/32): 18m 37s
Source: Read Text File Source -> Flat Map(25/32): 18m 37s
Source: Read Text File Source -> Flat Map(19/32): 18m 37s
Source: Read Text File Source -> Flat Map(18/32): 19m 30s
Source: Read Text File Source -> Flat Map(28/32): 19m 48s
Source: Read Text File Source -> Flat Map(8/32): 20m 05s
Source: Read Text File Source -> Flat Map(7/32): 20m 05s
Source: Read Text File Source -> Flat Map(22/32): 20m 17s
Source: Read Text File Source -> Flat Map(1/32): 20m 34s
Source: Read Text File Source -> Flat Map(32/32): 21m 56s
Source: Read Text File Source -> Flat Map(13/32): 21m 56s
Source: Read Text File Source -> Flat Map(12/32): 21m 56s
Source: Read Text File Source -> Flat Map(3/32): 21m 56s
Source: Read Text File Source -> Flat Map(30/32): 21m 56s
Source: Read Text File Source -> Flat Map(24/32): 22m 25s
Source: Read Text File Source -> Flat Map(17/32): 22m 26s
Source: Read Text File Source -> Flat Map(23/32): 22m 38s
Source: Read Text File Source -> Flat Map(4/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(12/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(21/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(22/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(11/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(18/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(5/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(24/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(13/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(6/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(9/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(26/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(2/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(3/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(28/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(15/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(19/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(7/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(27/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(25/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(23/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(14/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(17/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(20/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(29/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(32/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(31/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(4/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(8/32): 22m 55s
Keyed Aggregation -> Sink: Unnamed(30/32): 23m 31s
Keyed Aggregation -> Sink: Unnamed(1/32): 23m 31s
Keyed Aggregation -> Sink: Unnamed(10/32): 23m 31s
Keyed Aggregation -> Sink: Unnamed(16/32): 23m 32s

#==============================================================================
# parallelism.default: 64
#==============================================================================
Source: Read Text File Source -> Flat Map(13/64): 12m 48s
Source: Read Text File Source -> Flat Map(38/64): 13m 29s
Source: Read Text File Source -> Flat Map(47/64): 14m 37s
Source: Read Text File Source -> Flat Map(24/64): 17m 56s
Source: Read Text File Source -> Flat Map(25/64): 20m 47s
Source: Read Text File Source -> Flat Map(4/64): 20m 58s
Source: Read Text File Source -> Flat Map(45/64): 21m 55s
Source: Read Text File Source -> Flat Map(14/64): 21m 55s
Source: Read Text File Source -> Flat Map(53/64): 22m 13s
Source: Read Text File Source -> Flat Map(48/64): 23m 50s
Source: Read Text File Source -> Flat Map(50/64): 23m 50s
Source: Read Text File Source -> Flat Map(16/64): 24m 00s
Source: Read Text File Source -> Flat Map(8/64): 24m 11s
Source: Read Text File Source -> Flat Map(3/64): 24m 11s
Source: Read Text File Source -> Flat Map(12/64): 24m 23s
Source: Read Text File Source -> Flat Map(59/64): 24m 23s
Source: Read Text File Source -> Flat Map(49/64): 24m 23s
Source: Read Text File Source -> Flat Map(35/64): 25m 15s
Source: Read Text File Source -> Flat Map(64/64): 26m 11s
Source: Read Text File Source -> Flat Map(40/64): 27m 09s
Source: Read Text File Source -> Flat Map(54/64): 27m 38s
Source: Read Text File Source -> Flat Map(5/64): 28m 56s
Source: Read Text File Source -> Flat Map(28/64): 30m 43s
Source: Read Text File Source -> Flat Map(17/64): 30m 43s
Source: Read Text File Source -> Flat Map(11/64): 30m 43s
Source: Read Text File Source -> Flat Map(7/64): 31m 47s
Source: Read Text File Source -> Flat Map(18/64): 31m 54s
Source: Read Text File Source -> Flat Map(23/64): 31m 54s
Source: Read Text File Source -> Flat Map(32/64): 32m 46s
Source: Read Text File Source -> Flat Map(60/64): 32m 46s
Source: Read Text File Source -> Flat Map(52/64): 32m 46s
Source: Read Text File Source -> Flat Map(19/64): 33m 33s
Source: Read Text File Source -> Flat Map(10/64): 33m 33s
Source: Read Text File Source -> Flat Map(61/64): 33m 59s
Source: Read Text File Source -> Flat Map(2/64): 34m 18s
Source: Read Text File Source -> Flat Map(22/64): 34m 18s
Source: Read Text File Source -> Flat Map(6/64): 34m 18s
Source: Read Text File Source -> Flat Map(26/64): 34m 58s
Source: Read Text File Source -> Flat Map(56/64): 36m 22s
Source: Read Text File Source -> Flat Map(36/64): 37m 07s
Source: Read Text File Source -> Flat Map(42/64): 37m 19s
Source: Read Text File Source -> Flat Map(20/64): 37m 50s
Source: Read Text File Source -> Flat Map(55/64): 38m 34s
Source: Read Text File Source -> Flat Map(63/64): 38m 47s
Source: Read Text File Source -> Flat Map(1/64): 38m 47s
Source: Read Text File Source -> Flat Map(43/64): 40m 19s
Source: Read Text File Source -> Flat Map(51/64): 40m 37s
Source: Read Text File Source -> Flat Map(46/64): 41m 58s
Source: Read Text File Source -> Flat Map(9/64): 42m 04s
Source: Read Text File Source -> Flat Map(57/64): 42m 41s
Source: Read Text File Source -> Flat Map(33/64): 43m 25s
Source: Read Text File Source -> Flat Map(30/64): 43m 25s
Source: Read Text File Source -> Flat Map(44/64): 45m 04s
Source: Read Text File Source -> Flat Map(34/64): 46m 17s
Source: Read Text File Source -> Flat Map(39/64): 46m 40s
Source: Read Text File Source -> Flat Map(58/64): 46m 49s
Source: Read Text File Source -> Flat Map(27/64): 47m 45s
Source: Read Text File Source -> Flat Map(29/64): 47m 45s
Source: Read Text File Source -> Flat Map(31/64): 48m 21s
Source: Read Text File Source -> Flat Map(21/64): 48m 38s
Source: Read Text File Source -> Flat Map(41/64): 49m 41s
Source: Read Text File Source -> Flat Map(15/64): 53m 41s
Source: Read Text File Source -> Flat Map(37/64): 54m 27s
Source: Read Text File Source -> Flat Map(62/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(36/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(61/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(64/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(25/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(23/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(2/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(15/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(18/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(34/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(9/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(27/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(40/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(54/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(24/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(19/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(56/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(8/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(46/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(6/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(58/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(59/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(10/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(55/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(38/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(7/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(20/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(5/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(49/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(12/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(28/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(31/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(29/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(50/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(60/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(26/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(43/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(17/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(47/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(37/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(3/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(13/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(14/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(48/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(30/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(63/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(45/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(35/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(51/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(22/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(44/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(32/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(42/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(52/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(41/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(39/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(4/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(21/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(62/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(57/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(33/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(1/64): 54m 56s
Keyed Aggregation -> Sink: Unnamed(53/64): 55m 24s
Keyed Aggregation -> Sink: Unnamed(11/64): 55m 24s
Keyed Aggregation -> Sink: Unnamed(16/64): 55m 25s

Re: scaling a flink streaming application on a single node

Posted by Ufuk Celebi <uc...@apache.org>.
Just to clarify: Shinhyung is running one a single node with 4 CPUs,
each having 16 cores.

On Mon, Apr 4, 2016 at 10:32 AM, Robert Metzger <rm...@apache.org> wrote:
> Hi,
>
> usually it doesn't make sense to run multiple task managers on a single
> machine to get more slots.
> Your machine has only 4 CPU cores, so you are just putting a lot of pressure
> on the cpu scheduler..
>
> On Thu, Mar 31, 2016 at 7:16 PM, Shinhyung Yang <sh...@gmail.com>
> wrote:
>>
>> Thank you for replying!
>>
>> I am trying to do this on a single machine in fact. Since it has 64
>> cores, it would be interesting to look at the performance in that
>> regard.
>>
>> > How many machines are you using for this?
>> >
>> > The fact that you are giving 64 slots to each TaskManager means that a
>> > single TaskManager may end up executing all 64 pipelines. That would
>> > heavily
>> > overload that TaskManager and cause heavy degradation.
>>
>> Does it make sense if I run multiple TaskManagers on a single machine
>> if 64 slots are too many for a TaskManager?
>>
>> > If, for example, you use 16 machines, then give each machine 4 task
>> > slots
>> > (total of 64 slots across all machines)
>> > That way, the final run (parallelism 64) will be guaranteed to be spread
>> > across all machines.
>>
>> My intention for the experiment at the moment is to try to scale the
>> application up on a single machine to its maximum before moving on to
>> run the experiment on multiple machines.
>>
>> Thank you again!
>> With best regards,
>> Shinhyung Yang
>
>

Re: scaling a flink streaming application on a single node

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I am not sure since people normally don't run Flink on such large machines.
They rather run it on many smaller machines.

I will definitely be interesting too see your new results where the Job can
actually use all the memory available on the machine.

--
aljoscha

On Mon, 4 Apr 2016 at 15:54 Shinhyung Yang <sh...@gmail.com> wrote:

> Dear Aljoscha and Ufuk,
>
> Thank you for clarifying! Yes I'm running this wordcount application
> on a 64-core machine with 120GB ram allocated for users.
>
> > In that case, the amount of RAM you give to the TaskManager seems to low.
> > Could you try re-running your experiments with:
> > jobmanager.heap.mb: 5000
> > taskmanager.heap.mb: 100000
> >
> > So, about 100 GB of RAM for the TaskManager.
>
> Definitely I will try this! The result will be really interesting for
> sure. In this case, am I still good to go with 64 task slots with a
> single task manager?
>
> Thank you.
> With best regards,
> Shinhyung Yang.
>

Re: scaling a flink streaming application on a single node

Posted by Shinhyung Yang <sh...@gmail.com>.
Dear Aljoscha and Ufuk,

Thank you for clarifying! Yes I'm running this wordcount application
on a 64-core machine with 120GB ram allocated for users.

> In that case, the amount of RAM you give to the TaskManager seems to low.
> Could you try re-running your experiments with:
> jobmanager.heap.mb: 5000
> taskmanager.heap.mb: 100000
>
> So, about 100 GB of RAM for the TaskManager.

Definitely I will try this! The result will be really interesting for
sure. In this case, am I still good to go with 64 task slots with a
single task manager?

Thank you.
With best regards,
Shinhyung Yang.

Re: scaling a flink streaming application on a single node

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I'm afraid no one read your email carefully. You indeed have one very big
machine with 64 physical CPU cores and 120 GB of RAM, correct?

In that case, the amount of RAM you give to the TaskManager seems to low.
Could you try re-running your experiments with:
jobmanager.heap.mb: 5000
taskmanager.heap.mb: 100000

So, about 100 GB of RAM for the TaskManager.

Cheers,
Aljoscha

On Mon, 4 Apr 2016 at 10:32 Robert Metzger <rm...@apache.org> wrote:

> Hi,
>
> usually it doesn't make sense to run multiple task managers on a single
> machine to get more slots.
> Your machine has only 4 CPU cores, so you are just putting a lot of
> pressure on the cpu scheduler..
>
> On Thu, Mar 31, 2016 at 7:16 PM, Shinhyung Yang <sh...@gmail.com>
> wrote:
>
>> Thank you for replying!
>>
>> I am trying to do this on a single machine in fact. Since it has 64
>> cores, it would be interesting to look at the performance in that
>> regard.
>>
>> > How many machines are you using for this?
>> >
>> > The fact that you are giving 64 slots to each TaskManager means that a
>> > single TaskManager may end up executing all 64 pipelines. That would
>> heavily
>> > overload that TaskManager and cause heavy degradation.
>>
>> Does it make sense if I run multiple TaskManagers on a single machine
>> if 64 slots are too many for a TaskManager?
>>
>> > If, for example, you use 16 machines, then give each machine 4 task
>> slots
>> > (total of 64 slots across all machines)
>> > That way, the final run (parallelism 64) will be guaranteed to be spread
>> > across all machines.
>>
>> My intention for the experiment at the moment is to try to scale the
>> application up on a single machine to its maximum before moving on to
>> run the experiment on multiple machines.
>>
>> Thank you again!
>> With best regards,
>> Shinhyung Yang
>>
>
>

Re: scaling a flink streaming application on a single node

Posted by Robert Metzger <rm...@apache.org>.
Hi,

usually it doesn't make sense to run multiple task managers on a single
machine to get more slots.
Your machine has only 4 CPU cores, so you are just putting a lot of
pressure on the cpu scheduler..

On Thu, Mar 31, 2016 at 7:16 PM, Shinhyung Yang <sh...@gmail.com>
wrote:

> Thank you for replying!
>
> I am trying to do this on a single machine in fact. Since it has 64
> cores, it would be interesting to look at the performance in that
> regard.
>
> > How many machines are you using for this?
> >
> > The fact that you are giving 64 slots to each TaskManager means that a
> > single TaskManager may end up executing all 64 pipelines. That would
> heavily
> > overload that TaskManager and cause heavy degradation.
>
> Does it make sense if I run multiple TaskManagers on a single machine
> if 64 slots are too many for a TaskManager?
>
> > If, for example, you use 16 machines, then give each machine 4 task slots
> > (total of 64 slots across all machines)
> > That way, the final run (parallelism 64) will be guaranteed to be spread
> > across all machines.
>
> My intention for the experiment at the moment is to try to scale the
> application up on a single machine to its maximum before moving on to
> run the experiment on multiple machines.
>
> Thank you again!
> With best regards,
> Shinhyung Yang
>

Re: scaling a flink streaming application on a single node

Posted by Shinhyung Yang <sh...@gmail.com>.
Thank you for replying!

I am trying to do this on a single machine in fact. Since it has 64
cores, it would be interesting to look at the performance in that
regard.

> How many machines are you using for this?
>
> The fact that you are giving 64 slots to each TaskManager means that a
> single TaskManager may end up executing all 64 pipelines. That would heavily
> overload that TaskManager and cause heavy degradation.

Does it make sense if I run multiple TaskManagers on a single machine
if 64 slots are too many for a TaskManager?

> If, for example, you use 16 machines, then give each machine 4 task slots
> (total of 64 slots across all machines)
> That way, the final run (parallelism 64) will be guaranteed to be spread
> across all machines.

My intention for the experiment at the moment is to try to scale the
application up on a single machine to its maximum before moving on to
run the experiment on multiple machines.

Thank you again!
With best regards,
Shinhyung Yang

Re: scaling a flink streaming application on a single node

Posted by Stephan Ewen <se...@apache.org>.
Hi!

How many machines are you using for this?

The fact that you are giving 64 slots to each TaskManager means that a
single TaskManager may end up executing all 64 pipelines. That would
heavily overload that TaskManager and cause heavy degradation.

If, for example, you use 16 machines, then give each machine 4 task slots
(total of 64 slots across all machines)
That way, the final run (parallelism 64) will be guaranteed to be spread
across all machines.

Greetings,
Stephan


On Thu, Mar 31, 2016 at 4:50 PM, Shinhyung Yang <sh...@gmail.com>
wrote:

> Dear flink users and developers,
>
> I am trying to test scaling a flink streaming application on a single
> node and here I summarize my configuration and preliminary result. It
> would be really helpful if you take some time and consult my settings.
>
> test application: flink-1.0.0/examples/streaming/WordCount.jar
> input file: enwiki-20160305-pages-articles-multistream-index.txt
> (747,757,155 bytes)
> (
> https://dumps.wikimedia.org/enwiki/20160305/enwiki-20160305-pages-articles-multistream-index.txt.bz2
> )
>
> Running environment is as follows:
>
> cpu: (4 * AMD Opteron 6378 (16 cores per each)) 2.4GHz
> memory: 120 GB
> os: CentOS 7.2
> vm: Java 8u74
> flink: flink-1.0.0
>
> My flink configuration is here (modified ones):
>
> jobmanager.heap.mb: 1024
> taskmanager.heap.mb: 2048
> taskmanager.numberOfTaskSlots: 64
> taskmanager.network.numberOfBuffers: 8192
>
> Keeping the configuration above the same all the way through my test,
> I only changed parallelism.default for each of the test cases.
>
> parallelism.default: 1
> parallelism.default: 2
> parallelism.default: 4
> parallelism.default: 16
> parallelism.default: 32
> parallelism.default: 64
>
> And I put the result at the end.
>
> It seems to scale well until the case of parallelism 8 and usually,
> ``Source -> Flat Map'' scales better than ``Keyed Aggregation ->
> Sink''. The result from parallelized subtasks of ``Keyed Aggregation
> -> Sink'' seem more consistent than the subtasks of ``Source -> Flat
> Map''.
>
> Do you see anything that I might need to / have to fix in the flink
> configuration or jvm configuration(I did not touch this for this
> experiment) to improve the performance? Especially the result I got
> with parallelism of 64 does not look good to me. I would also really
> appreciate if there is something you want to suggest that might be
> worth trying.
>
> Thank you.
> With best regards,
> Shinhyung Yang
>
>
> #==============================================================================
> # parallelism.default: 1
>
> #==============================================================================
> Source: Read Text File Source -> Flat Map(1/1): 34m 57s
> Keyed Aggregation -> Sink: Unnamed(1/1): 47m 05s
>
>
> #==============================================================================
> # parallelism.default: 2
>
> #==============================================================================
> Source: Read Text File Source -> Flat Map(1/2): 25m 56s
> Source: Read Text File Source -> Flat Map(2/2): 26m 27s
> Keyed Aggregation -> Sink: Unnamed(1/2):  34m 09s
> Keyed Aggregation -> Sink: Unnamed(2/2):  34m 08s
>
>
> #==============================================================================
> # parallelism.default: 4
>
> #==============================================================================
> Source: Read Text File Source -> Flat Map(2/4): 21m 39s
> Source: Read Text File Source -> Flat Map(1/4): 21m 39s
> Source: Read Text File Source -> Flat Map(4/4): 22m 30s
> Source: Read Text File Source -> Flat Map(3/4): 22m 31s
> Keyed Aggregation -> Sink: Unnamed(3/4): 23m 20s
> Keyed Aggregation -> Sink: Unnamed(1/4): 24m 13s
> Keyed Aggregation -> Sink: Unnamed(4/4): 24m 57s
> Keyed Aggregation -> Sink: Unnamed(2/4): 25m 35s
>
>
> #==============================================================================
> # parallelism.default: 8
>
> #==============================================================================
> Source: Read Text File Source -> Flat Map(1/8): 21m 06s
> Source: Read Text File Source -> Flat Map(7/8): 21m 32s
> Source: Read Text File Source -> Flat Map(2/8): 21m 45s
> Source: Read Text File Source -> Flat Map(6/8): 21m 45s
> Source: Read Text File Source -> Flat Map(4/8): 21m 45s
> Source: Read Text File Source -> Flat Map(3/8): 21m 45s
> Source: Read Text File Source -> Flat Map(8/8): 21m 55s
> Source: Read Text File Source -> Flat Map(5/8): 21m 55s
> Keyed Aggregation -> Sink: Unnamed(4/8):  22m 08s
> Keyed Aggregation -> Sink: Unnamed(3/8):  22m 28s
> Keyed Aggregation -> Sink: Unnamed(7/8):  22m 58s
> Keyed Aggregation -> Sink: Unnamed(6/8):  22m 59s
> Keyed Aggregation -> Sink: Unnamed(5/8):  22m 59s
> Keyed Aggregation -> Sink: Unnamed(2/8):  23m 00s
> Keyed Aggregation -> Sink: Unnamed(1/8):  23m 47s
> Keyed Aggregation -> Sink: Unnamed(8/8):  23m 51s
>
>
> #==============================================================================
> # parallelism.default: 16
>
> #==============================================================================
> Source; Read Text File Source -> Flat Map(14/16); 08m 57s
> Source; Read Text File Source -> Flat Map(5/16); 13m 00s
> Source; Read Text File Source -> Flat Map(10/16); 16m 49s
> Source; Read Text File Source -> Flat Map(15/16); 16m 49s
> Source; Read Text File Source -> Flat Map(6/16); 17m 54s
> Source; Read Text File Source -> Flat Map(2/16); 18m 40s
> Source; Read Text File Source -> Flat Map(4/16); 19m 48s
> Source; Read Text File Source -> Flat Map(8/16); 19m 48s
> Source; Read Text File Source -> Flat Map(16/16); 20m 49s
> Source; Read Text File Source -> Flat Map(9/16); 21m 06s
> Source; Read Text File Source -> Flat Map(12/16); 21m 41s
> Source; Read Text File Source -> Flat Map(3/16); 22m 08s
> Source; Read Text File Source -> Flat Map(11/16); 22m 08s
> Source; Read Text File Source -> Flat Map(1/16); 24m 13s
> Source; Read Text File Source -> Flat Map(13/16); 24m 13s
> Source; Read Text File Source -> Flat Map(7/16); 24m 14s
> Keyed Aggregation -> Sink; Unnamed(2/16); 24m 23s
> Keyed Aggregation -> Sink; Unnamed(12/16); 24m 23s
> Keyed Aggregation -> Sink; Unnamed(6/16); 24m 23s
> Keyed Aggregation -> Sink; Unnamed(8/16); 24m 23s
> Keyed Aggregation -> Sink; Unnamed(14/16); 24m 23s
> Keyed Aggregation -> Sink; Unnamed(3/16); 24m 23s
> Keyed Aggregation -> Sink; Unnamed(15/16); 24m 23s
> Keyed Aggregation -> Sink; Unnamed(4/16); 24m 23s
> Keyed Aggregation -> Sink; Unnamed(9/16); 24m 23s
> Keyed Aggregation -> Sink; Unnamed(7/16); 24m 24s
> Keyed Aggregation -> Sink; Unnamed(11/16); 24m 24s
> Keyed Aggregation -> Sink; Unnamed(13/16); 24m 24s
> Keyed Aggregation -> Sink; Unnamed(1/16); 25m 07s
> Keyed Aggregation -> Sink; Unnamed(5/16); 25m 08s
> Keyed Aggregation -> Sink; Unnamed(10/16); 25m 19s
> Keyed Aggregation -> Sink; Unnamed(16/16); 25m 21s
>
>
> #==============================================================================
> # parallelism.default: 32
>
> #==============================================================================
> Source: Read Text File Source -> Flat Map(14/32): 05m 25s
> Source: Read Text File Source -> Flat Map(2/32): 05m 41s
> Source: Read Text File Source -> Flat Map(26/32): 07m 24s
> Source: Read Text File Source -> Flat Map(15/32): 09m 35s
> Source: Read Text File Source -> Flat Map(9/32): 10m 23s
> Source: Read Text File Source -> Flat Map(11/32): 10m 40s
> Source: Read Text File Source -> Flat Map(31/32): 10m 40s
> Source: Read Text File Source -> Flat Map(27/32): 10m 41s
> Source: Read Text File Source -> Flat Map(20/32): 13m 25s
> Source: Read Text File Source -> Flat Map(29/32): 15m 02s
> Source: Read Text File Source -> Flat Map(5/32): 15m 43s
> Source: Read Text File Source -> Flat Map(16/32): 16m 00s
> Source: Read Text File Source -> Flat Map(21/32): 16m 18s
> Source: Read Text File Source -> Flat Map(6/32): 17m 28s
> Source: Read Text File Source -> Flat Map(10/32): 18m 37s
> Source: Read Text File Source -> Flat Map(25/32): 18m 37s
> Source: Read Text File Source -> Flat Map(19/32): 18m 37s
> Source: Read Text File Source -> Flat Map(18/32): 19m 30s
> Source: Read Text File Source -> Flat Map(28/32): 19m 48s
> Source: Read Text File Source -> Flat Map(8/32): 20m 05s
> Source: Read Text File Source -> Flat Map(7/32): 20m 05s
> Source: Read Text File Source -> Flat Map(22/32): 20m 17s
> Source: Read Text File Source -> Flat Map(1/32): 20m 34s
> Source: Read Text File Source -> Flat Map(32/32): 21m 56s
> Source: Read Text File Source -> Flat Map(13/32): 21m 56s
> Source: Read Text File Source -> Flat Map(12/32): 21m 56s
> Source: Read Text File Source -> Flat Map(3/32): 21m 56s
> Source: Read Text File Source -> Flat Map(30/32): 21m 56s
> Source: Read Text File Source -> Flat Map(24/32): 22m 25s
> Source: Read Text File Source -> Flat Map(17/32): 22m 26s
> Source: Read Text File Source -> Flat Map(23/32): 22m 38s
> Source: Read Text File Source -> Flat Map(4/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(12/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(21/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(22/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(11/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(18/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(5/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(24/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(13/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(6/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(9/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(26/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(2/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(3/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(28/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(15/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(19/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(7/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(27/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(25/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(23/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(14/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(17/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(20/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(29/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(32/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(31/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(4/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(8/32): 22m 55s
> Keyed Aggregation -> Sink: Unnamed(30/32): 23m 31s
> Keyed Aggregation -> Sink: Unnamed(1/32): 23m 31s
> Keyed Aggregation -> Sink: Unnamed(10/32): 23m 31s
> Keyed Aggregation -> Sink: Unnamed(16/32): 23m 32s
>
>
> #==============================================================================
> # parallelism.default: 64
>
> #==============================================================================
> Source: Read Text File Source -> Flat Map(13/64): 12m 48s
> Source: Read Text File Source -> Flat Map(38/64): 13m 29s
> Source: Read Text File Source -> Flat Map(47/64): 14m 37s
> Source: Read Text File Source -> Flat Map(24/64): 17m 56s
> Source: Read Text File Source -> Flat Map(25/64): 20m 47s
> Source: Read Text File Source -> Flat Map(4/64): 20m 58s
> Source: Read Text File Source -> Flat Map(45/64): 21m 55s
> Source: Read Text File Source -> Flat Map(14/64): 21m 55s
> Source: Read Text File Source -> Flat Map(53/64): 22m 13s
> Source: Read Text File Source -> Flat Map(48/64): 23m 50s
> Source: Read Text File Source -> Flat Map(50/64): 23m 50s
> Source: Read Text File Source -> Flat Map(16/64): 24m 00s
> Source: Read Text File Source -> Flat Map(8/64): 24m 11s
> Source: Read Text File Source -> Flat Map(3/64): 24m 11s
> Source: Read Text File Source -> Flat Map(12/64): 24m 23s
> Source: Read Text File Source -> Flat Map(59/64): 24m 23s
> Source: Read Text File Source -> Flat Map(49/64): 24m 23s
> Source: Read Text File Source -> Flat Map(35/64): 25m 15s
> Source: Read Text File Source -> Flat Map(64/64): 26m 11s
> Source: Read Text File Source -> Flat Map(40/64): 27m 09s
> Source: Read Text File Source -> Flat Map(54/64): 27m 38s
> Source: Read Text File Source -> Flat Map(5/64): 28m 56s
> Source: Read Text File Source -> Flat Map(28/64): 30m 43s
> Source: Read Text File Source -> Flat Map(17/64): 30m 43s
> Source: Read Text File Source -> Flat Map(11/64): 30m 43s
> Source: Read Text File Source -> Flat Map(7/64): 31m 47s
> Source: Read Text File Source -> Flat Map(18/64): 31m 54s
> Source: Read Text File Source -> Flat Map(23/64): 31m 54s
> Source: Read Text File Source -> Flat Map(32/64): 32m 46s
> Source: Read Text File Source -> Flat Map(60/64): 32m 46s
> Source: Read Text File Source -> Flat Map(52/64): 32m 46s
> Source: Read Text File Source -> Flat Map(19/64): 33m 33s
> Source: Read Text File Source -> Flat Map(10/64): 33m 33s
> Source: Read Text File Source -> Flat Map(61/64): 33m 59s
> Source: Read Text File Source -> Flat Map(2/64): 34m 18s
> Source: Read Text File Source -> Flat Map(22/64): 34m 18s
> Source: Read Text File Source -> Flat Map(6/64): 34m 18s
> Source: Read Text File Source -> Flat Map(26/64): 34m 58s
> Source: Read Text File Source -> Flat Map(56/64): 36m 22s
> Source: Read Text File Source -> Flat Map(36/64): 37m 07s
> Source: Read Text File Source -> Flat Map(42/64): 37m 19s
> Source: Read Text File Source -> Flat Map(20/64): 37m 50s
> Source: Read Text File Source -> Flat Map(55/64): 38m 34s
> Source: Read Text File Source -> Flat Map(63/64): 38m 47s
> Source: Read Text File Source -> Flat Map(1/64): 38m 47s
> Source: Read Text File Source -> Flat Map(43/64): 40m 19s
> Source: Read Text File Source -> Flat Map(51/64): 40m 37s
> Source: Read Text File Source -> Flat Map(46/64): 41m 58s
> Source: Read Text File Source -> Flat Map(9/64): 42m 04s
> Source: Read Text File Source -> Flat Map(57/64): 42m 41s
> Source: Read Text File Source -> Flat Map(33/64): 43m 25s
> Source: Read Text File Source -> Flat Map(30/64): 43m 25s
> Source: Read Text File Source -> Flat Map(44/64): 45m 04s
> Source: Read Text File Source -> Flat Map(34/64): 46m 17s
> Source: Read Text File Source -> Flat Map(39/64): 46m 40s
> Source: Read Text File Source -> Flat Map(58/64): 46m 49s
> Source: Read Text File Source -> Flat Map(27/64): 47m 45s
> Source: Read Text File Source -> Flat Map(29/64): 47m 45s
> Source: Read Text File Source -> Flat Map(31/64): 48m 21s
> Source: Read Text File Source -> Flat Map(21/64): 48m 38s
> Source: Read Text File Source -> Flat Map(41/64): 49m 41s
> Source: Read Text File Source -> Flat Map(15/64): 53m 41s
> Source: Read Text File Source -> Flat Map(37/64): 54m 27s
> Source: Read Text File Source -> Flat Map(62/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(36/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(61/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(64/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(25/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(23/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(2/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(15/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(18/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(34/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(9/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(27/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(40/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(54/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(24/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(19/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(56/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(8/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(46/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(6/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(58/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(59/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(10/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(55/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(38/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(7/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(20/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(5/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(49/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(12/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(28/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(31/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(29/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(50/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(60/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(26/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(43/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(17/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(47/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(37/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(3/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(13/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(14/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(48/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(30/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(63/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(45/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(35/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(51/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(22/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(44/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(32/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(42/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(52/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(41/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(39/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(4/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(21/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(62/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(57/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(33/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(1/64): 54m 56s
> Keyed Aggregation -> Sink: Unnamed(53/64): 55m 24s
> Keyed Aggregation -> Sink: Unnamed(11/64): 55m 24s
> Keyed Aggregation -> Sink: Unnamed(16/64): 55m 25s
>