You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "m@xi" <ma...@gmail.com> on 2017/11/08 14:19:06 UTC

Weird performance on custom Hashjoin w.r.t. parallelism

Hello everyone!

I have implemented a custom parallel hashjoin algorithm (without windows
feature) in order to calculate the join of two input streams on a common
attribute using the CoFlatMap function and the state. After the join
operator (which has parallelism p = #processors) operator I have a map
operation (with parallelism 1) where I am using the Meter component to
measure the average throughput of the join operation. Finally, I am using a
DiscardingSink() as I only care about the throughput and the final count of
the join's result. I maintain 2 values of the throughput, the MAX avg value
I ever seen and the AVG avg value I have seen.

I am running on a server with 48 processors and I expect throughput to get
higher when the parallelism p becomes > 1. The same input stream is used in
all cases. 

Although, as you can see in the excel file I attache not only the throughput
does not increase with the increase of p but also the time for the flink job
to execute increases as well.

I have also read this:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-td13170.html
where Kostas Kloudas implied that the Flink is not optimized for
multiprocessor execution. 

I am wondering if this issue has to do with 1) the way that I am measuring
throughput, 2) the Flink system's internals that are not optimized for
multiprocessor architecture.

Any ideas or comments are welcome.

Thanks in advance.

Best,
Max

experiments8_11_17.xlsx
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1161/experiments8_11_17.xlsx>  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Weird performance on custom Hashjoin w.r.t. parallelism

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Yes as you correctly analysed parallelism 1 was causing problems, because it meant that all of the records must been gathered over the network from all of the task managers. Keep in mind that even if you increase parallelism to ā€œpā€, every change in parallelism can slow down your application, because events will have to be redistributed, which in most cases means network transfers. 

For measuring throughput you could use already defined metrics in Flink:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>

You can get list of vertices of your job:
http://<web-ui-url>:8081/jobs/<job-id>/vertices <http://<web-ui-url>:8081/jobs/%3Cjob-id%3E/vertices>
Then statistics:
http://<web-ui-url>:8081/jobs/<job-id>/vertices/<vertex-id>/metrics <http://<web-ui-url>:8081/jobs/%3Cjob-id%3E/vertices/:vertex-id:/metrics>

For example
http://localhost:8081/jobs/34c6f7d00cf9b3ebfff4d94ad465eb23/vertices <http://localhost:8081/jobs/34c6f7d00cf9b3ebfff4d94ad465eb23/vertices>
http://localhost:8081/jobs/34c6f7d00cf9b3ebfff4d94ad465eb23/vertices/3d144c2a0fc19115f5f075ba85deac26/metrics <http://localhost:8081/jobs/34c6f7d00cf9b3ebfff4d94ad465eb23/vertices/3d144c2a0fc19115f5f075ba85deac26/metrics>

You can also try to aggregate them:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#rest-api-integration <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#rest-api-integration>

Piotrek

> On 9 Nov 2017, at 07:53, m@xi <ma...@gmail.com> wrote:
> 
> Hello!
> 
> I found out that the cause of the problem was the map that I have after the
> parallel join with parallelism 1.
> When I changed it to .map(new MyMapMeter).setParallelism(p) then when I
> increase the number of parallelism p the completion time decreases, which is
> reasonable. Somehow it was a bottleneck of my parallel execution plan, but I
> had it this way in order to measure a valid average throughput.
> 
> So, my question is the following: 
> 
> How can I measure the average throughput of my parallel join operation
> properly?
> 
> Best,
> Max
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Weird performance on custom Hashjoin w.r.t. parallelism

Posted by "m@xi" <ma...@gmail.com>.
Hello!

I found out that the cause of the problem was the map that I have after the
parallel join with parallelism 1.
When I changed it to .map(new MyMapMeter).setParallelism(p) then when I
increase the number of parallelism p the completion time decreases, which is
reasonable. Somehow it was a bottleneck of my parallel execution plan, but I
had it this way in order to measure a valid average throughput.

So, my question is the following: 

How can I measure the average throughput of my parallel join operation
properly?

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/