You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Philip Lee <ph...@gmail.com> on 2015/12/01 21:34:31 UTC

Hello, the performance of apply function after join

Hello, the performance of apply function after join.

Just for your information, I am running Flink job on the cluster consisted
of 9 machine with each 48 cores. I am working on some benchmark with
comparison of Flink, Spark-Sql, and Hive.

I tried to optimize *join function with Hint* for better performance. I
want to increase the performance as much as possible.

Here are Questions===
1) When seeing job progress log, apply() after join function seems like it
takes a bit long time. Do you think if I do not use apply() to format
tuples, I would gain the better performance? Well, I could set just the
column number instead of apply()

2) on using *join with Hint* like Huge or Tiny, is there the ideal ratio
regarding to the size of two tables? For me, if some table is 10 times
bigger than the other table, I use join with Hint. Otherwise, I usually use
the general join().

Best,
Phil

Re: Hello, the performance of apply function after join

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Phil,

an apply method after a join runs pipelined with the join, i.e., it starts
processing when the first join result is emitted and finishes after it
handled the last join result.
Unless the logic in your apply function is not terribly complex, this
should be OK. If you do not specify an apply method, a default method will
be used which returns a Tuple2(left, right).

Regarding the join hints, there is no general rule when to use joinWithHuge
/ joinWithTiny. It depends on the number of machines, machine specs, number
of records, record size, etc...
If you use joinWithHuge/Tiny, the smaller side will be broadcasted to every
node and each parallel partition will hold the full relation in memory,
i.e., if the smaller side is 10GB, you need at least 10GB for each task
manager slot. So this should only be used if the smaller side is *really*
small.

The join method does also allow to specify more fine-grained hints such as:

small.join(large, JoinHint.REPARTITION_HASH_SECOND)

which will execute the join by shuffling both inputs and building a hash
table on the partition of the second input.
If you want to optimize for performance, you should try both hints:
REPARTITION_HASH_*small* and BROADCAST_HASH_*small*.

Best, Fabian

2015-12-01 21:34 GMT+01:00 Philip Lee <ph...@gmail.com>:

> Hello, the performance of apply function after join.
>
> Just for your information, I am running Flink job on the cluster consisted
> of 9 machine with each 48 cores. I am working on some benchmark with
> comparison of Flink, Spark-Sql, and Hive.
>
> I tried to optimize *join function with Hint* for better performance. I
> want to increase the performance as much as possible.
>
> Here are Questions===
> 1) When seeing job progress log, apply() after join function seems like it
> takes a bit long time. Do you think if I do not use apply() to format
> tuples, I would gain the better performance? Well, I could set just the
> column number instead of apply()
>
> 2) on using *join with Hint* like Huge or Tiny, is there the ideal ratio
> regarding to the size of two tables? For me, if some table is 10 times
> bigger than the other table, I use join with Hint. Otherwise, I usually use
> the general join().
>
> Best,
> Phil
>
>
>
>
>
>
>