You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jacob Marble <jm...@kochava.com> on 2018/02/12 21:53:38 UTC

working with hot keys

When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
It can bring processing time from hours to days.

Reading this blog post
<https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind>
I
can see some thought has already been given to this problem:
"To address this, we allow you to provide extra parallelism hints using the
Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout. These
operations will create an extra step in your pipeline to pre-aggregate the
data on many machines before performing the final aggregation on the target
machines."

(1 of 2)

These two solutions, Combine.PerKey.withHotKeyFanout or
Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
however. So, I solved my problem with these stages before and after the
join operation, effectively joining K:Iterable<V1> with K:V2:

kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,
V1>create())

Join.someJoin(kvIterable1, kv2)
        .apply(Values.create())
        .apply("undo hot key GBK",
            ParDo
                .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() {
                  @ProcessElement
                  public void fanout(ProcessContext context) {
                    for (V1 v1 : context.element().getKey()) {
                      context.output(KV.of(v1,
context.element().getValue()));
                    }
                  }
                }))

Does that look sane to people who have been working with Beam for a long
time? It has worked well for us over the last two months or so.

(2 of 2)

Lately, the size of the value has grown too large. It took some effort to
figure out the problem, which manifested as an
ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
Here's the follow-up solution, only changing the first half of the above
solution:

kvIterable1 = kv1
        .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
        .apply("partition grouped values",
            ParDo
                .of(new DoFn<KV<K, Iterable<V1>>, KV<K, Iterable<V1>>>() {
                  @ProcessElement
                  public void partition(ProcessContext context) {
                    K k = context.element().getKey();
                    Iterable<V1> v1Iterable = context.element().getValue();
                    for (List<V1> partition :
                        Iterables.partition(v1Iterable, 1000000)) {
                      context.output(KV.<K, Iterable<V1>>of(k, partition));
                    }
                  }
                }));

Again, is this sane? Initial testing suggests this is a good solution.

Jacob

Re: working with hot keys

Posted by Jacob Marble <jm...@kochava.com>.
After implementing this, I'm surprised how long it takes to complete
View.AsMap. GBKaSVForKeys and ToIsmMetadataRecordForKey seem to be the
bottleneck, GCE instance group is using 0.5% CPU and about 8M/s network.

Browsing the implementation, I think this is sorting the keys, which makes
sense, and makes me think there likely isn't room for performance
improvement.

The job is going to take much longer now, no matter how fast the new
pseudo-join goes. The source of this view is a Bigtable, and the table key
is the same as the join key. I wonder if there's a way to leverage that.

Jacob

On Tue, Feb 13, 2018 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:

> Both are doing the same thing effectively by loading the entire iterable
> into memory in the first case and the partitioned iterable into memory in
> the second case.
>
> The side input performance varies a lot depending on whether your running
> a pipeline with bounded or unbounded PCollections, PCollection sizes, side
> input access pattern.
>
> On Tue, Feb 13, 2018 at 9:08 AM, Jacob Marble <jm...@kochava.com> wrote:
>
>> On Mon, Feb 12, 2018 at 3:59 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> The optimization that you have done is that you have forced the V1
>>> iterable to reside in memory completely since it is now counted as a single
>>> element. This will fall apart as soon your V1 iterable exceeds memory.
>>> Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing
>>> for the GBK/CoGBK result to exceed the size of memory and this currently
>>> only functions at the first level within the value iterable, meaning that
>>> the entire Iterable<V1> is treated as a single value in your
>>> Join.someJoin. You should see similar performance if you take all the
>>> V1s out of the CoGBK and "copy" it into an arraylist inside your DoFn and
>>> then walk the V2 iterable and the in memory array list performing the outer
>>> join. It will also likely be easier to reason about. Note that Dataflow
>>> doesn't do this in a great way and causes the re-iteration to happen many
>>> more times then it should need to which is why your perf numbers are
>>> ballooning.
>>>
>>
>> Not sure, are you referring to (1) or (2) of my original?
>>
>> So (2) did fail with full production load. The join has 2+ billion
>> elements on both sides.
>>
>>
>>> Alternatively, have you tried putting either of the PCollection<K, V (V1
>>> or V2)> into a multimap side input and then just doing a GBK on the other
>>> PCollection<K, V (V2 or V1)> followed by a DoFn that joins the two together
>>> with the multimap side input?
>>> The choice of whether V1 or V2 works better in the side input depends on
>>> the sizes of the relative PCollections and whether the working set of the
>>> PCollection can be cached in memory (good for side input) or the GBK
>>> PCollection is sparse enough that if everything is cache miss it won't
>>> matter.
>>>
>>
>> I'll try this. K:V2 has unique keys and doesn't change a lot from
>> day-to-day, so I'll make that a side input. Should I expect this method to
>> perform significantly slower than Join.someJoin/CoGBK?
>>
>> On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble <jm...@kochava.com>
>>> wrote:
>>>
>>>> When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
>>>> PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
>>>> It can bring processing time from hours to days.
>>>>
>>>> Reading this blog post
>>>> <https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind> I
>>>> can see some thought has already been given to this problem:
>>>> "To address this, we allow you to provide extra parallelism hints using
>>>> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout.
>>>> These operations will create an extra step in your pipeline to
>>>> pre-aggregate the data on many machines before performing the final
>>>> aggregation on the target machines."
>>>>
>>>> (1 of 2)
>>>>
>>>> These two solutions, Combine.PerKey.withHotKeyFanout or
>>>> Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
>>>> however. So, I solved my problem with these stages before and after the
>>>> join operation, effectively joining K:Iterable<V1> with K:V2:
>>>>
>>>> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,
>>>> V1>create())
>>>>
>>>> Join.someJoin(kvIterable1, kv2)
>>>>         .apply(Values.create())
>>>>         .apply("undo hot key GBK",
>>>>             ParDo
>>>>                 .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() {
>>>>                   @ProcessElement
>>>>                   public void fanout(ProcessContext context) {
>>>>                     for (V1 v1 : context.element().getKey()) {
>>>>                       context.output(KV.of(v1,
>>>> context.element().getValue()));
>>>>                     }
>>>>                   }
>>>>                 }))
>>>>
>>>> Does that look sane to people who have been working with Beam for a
>>>> long time? It has worked well for us over the last two months or so.
>>>>
>>>> (2 of 2)
>>>>
>>>> Lately, the size of the value has grown too large. It took some effort
>>>> to figure out the problem, which manifested as an
>>>> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
>>>> Here's the follow-up solution, only changing the first half of the above
>>>> solution:
>>>>
>>>> kvIterable1 = kv1
>>>>         .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
>>>>         .apply("partition grouped values",
>>>>             ParDo
>>>>                 .of(new DoFn<KV<K, Iterable<V1>>, KV<K,
>>>> Iterable<V1>>>() {
>>>>                   @ProcessElement
>>>>                   public void partition(ProcessContext context) {
>>>>                     K k = context.element().getKey();
>>>>                     Iterable<V1> v1Iterable =
>>>> context.element().getValue();
>>>>                     for (List<V1> partition :
>>>>                         Iterables.partition(v1Iterable, 1000000)) {
>>>>                       context.output(KV.<K, Iterable<V1>>of(k,
>>>> partition));
>>>>                     }
>>>>                   }
>>>>                 }));
>>>>
>>>> Again, is this sane? Initial testing suggests this is a good solution.
>>>>
>>>> Jacob
>>>>
>>>
>>>
>>
>

Re: working with hot keys

Posted by Lukasz Cwik <lc...@google.com>.
Both are doing the same thing effectively by loading the entire iterable
into memory in the first case and the partitioned iterable into memory in
the second case.

The side input performance varies a lot depending on whether your running a
pipeline with bounded or unbounded PCollections, PCollection sizes, side
input access pattern.

On Tue, Feb 13, 2018 at 9:08 AM, Jacob Marble <jm...@kochava.com> wrote:

> On Mon, Feb 12, 2018 at 3:59 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> The optimization that you have done is that you have forced the V1
>> iterable to reside in memory completely since it is now counted as a single
>> element. This will fall apart as soon your V1 iterable exceeds memory.
>> Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing
>> for the GBK/CoGBK result to exceed the size of memory and this currently
>> only functions at the first level within the value iterable, meaning that
>> the entire Iterable<V1> is treated as a single value in your
>> Join.someJoin. You should see similar performance if you take all the
>> V1s out of the CoGBK and "copy" it into an arraylist inside your DoFn and
>> then walk the V2 iterable and the in memory array list performing the outer
>> join. It will also likely be easier to reason about. Note that Dataflow
>> doesn't do this in a great way and causes the re-iteration to happen many
>> more times then it should need to which is why your perf numbers are
>> ballooning.
>>
>
> Not sure, are you referring to (1) or (2) of my original?
>
> So (2) did fail with full production load. The join has 2+ billion
> elements on both sides.
>
>
>> Alternatively, have you tried putting either of the PCollection<K, V (V1
>> or V2)> into a multimap side input and then just doing a GBK on the other
>> PCollection<K, V (V2 or V1)> followed by a DoFn that joins the two together
>> with the multimap side input?
>> The choice of whether V1 or V2 works better in the side input depends on
>> the sizes of the relative PCollections and whether the working set of the
>> PCollection can be cached in memory (good for side input) or the GBK
>> PCollection is sparse enough that if everything is cache miss it won't
>> matter.
>>
>
> I'll try this. K:V2 has unique keys and doesn't change a lot from
> day-to-day, so I'll make that a side input. Should I expect this method to
> perform significantly slower than Join.someJoin/CoGBK?
>
> On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble <jm...@kochava.com> wrote:
>>
>>> When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
>>> PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
>>> It can bring processing time from hours to days.
>>>
>>> Reading this blog post
>>> <https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind> I
>>> can see some thought has already been given to this problem:
>>> "To address this, we allow you to provide extra parallelism hints using
>>> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout.
>>> These operations will create an extra step in your pipeline to
>>> pre-aggregate the data on many machines before performing the final
>>> aggregation on the target machines."
>>>
>>> (1 of 2)
>>>
>>> These two solutions, Combine.PerKey.withHotKeyFanout or
>>> Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
>>> however. So, I solved my problem with these stages before and after the
>>> join operation, effectively joining K:Iterable<V1> with K:V2:
>>>
>>> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,
>>> V1>create())
>>>
>>> Join.someJoin(kvIterable1, kv2)
>>>         .apply(Values.create())
>>>         .apply("undo hot key GBK",
>>>             ParDo
>>>                 .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() {
>>>                   @ProcessElement
>>>                   public void fanout(ProcessContext context) {
>>>                     for (V1 v1 : context.element().getKey()) {
>>>                       context.output(KV.of(v1,
>>> context.element().getValue()));
>>>                     }
>>>                   }
>>>                 }))
>>>
>>> Does that look sane to people who have been working with Beam for a long
>>> time? It has worked well for us over the last two months or so.
>>>
>>> (2 of 2)
>>>
>>> Lately, the size of the value has grown too large. It took some effort
>>> to figure out the problem, which manifested as an
>>> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
>>> Here's the follow-up solution, only changing the first half of the above
>>> solution:
>>>
>>> kvIterable1 = kv1
>>>         .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
>>>         .apply("partition grouped values",
>>>             ParDo
>>>                 .of(new DoFn<KV<K, Iterable<V1>>, KV<K, Iterable<V1>>>()
>>> {
>>>                   @ProcessElement
>>>                   public void partition(ProcessContext context) {
>>>                     K k = context.element().getKey();
>>>                     Iterable<V1> v1Iterable =
>>> context.element().getValue();
>>>                     for (List<V1> partition :
>>>                         Iterables.partition(v1Iterable, 1000000)) {
>>>                       context.output(KV.<K, Iterable<V1>>of(k,
>>> partition));
>>>                     }
>>>                   }
>>>                 }));
>>>
>>> Again, is this sane? Initial testing suggests this is a good solution.
>>>
>>> Jacob
>>>
>>
>>
>

Re: working with hot keys

Posted by Jacob Marble <jm...@kochava.com>.
On Mon, Feb 12, 2018 at 3:59 PM, Lukasz Cwik <lc...@google.com> wrote:

> The optimization that you have done is that you have forced the V1
> iterable to reside in memory completely since it is now counted as a single
> element. This will fall apart as soon your V1 iterable exceeds memory.
> Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing
> for the GBK/CoGBK result to exceed the size of memory and this currently
> only functions at the first level within the value iterable, meaning that
> the entire Iterable<V1> is treated as a single value in your
> Join.someJoin. You should see similar performance if you take all the V1s
> out of the CoGBK and "copy" it into an arraylist inside your DoFn and then
> walk the V2 iterable and the in memory array list performing the outer
> join. It will also likely be easier to reason about. Note that Dataflow
> doesn't do this in a great way and causes the re-iteration to happen many
> more times then it should need to which is why your perf numbers are
> ballooning.
>

Not sure, are you referring to (1) or (2) of my original?

So (2) did fail with full production load. The join has 2+ billion elements
on both sides.


> Alternatively, have you tried putting either of the PCollection<K, V (V1
> or V2)> into a multimap side input and then just doing a GBK on the other
> PCollection<K, V (V2 or V1)> followed by a DoFn that joins the two together
> with the multimap side input?
> The choice of whether V1 or V2 works better in the side input depends on
> the sizes of the relative PCollections and whether the working set of the
> PCollection can be cached in memory (good for side input) or the GBK
> PCollection is sparse enough that if everything is cache miss it won't
> matter.
>

I'll try this. K:V2 has unique keys and doesn't change a lot from
day-to-day, so I'll make that a side input. Should I expect this method to
perform significantly slower than Join.someJoin/CoGBK?

On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble <jm...@kochava.com> wrote:
>
>> When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
>> PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
>> It can bring processing time from hours to days.
>>
>> Reading this blog post
>> <https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind> I
>> can see some thought has already been given to this problem:
>> "To address this, we allow you to provide extra parallelism hints using
>> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout.
>> These operations will create an extra step in your pipeline to
>> pre-aggregate the data on many machines before performing the final
>> aggregation on the target machines."
>>
>> (1 of 2)
>>
>> These two solutions, Combine.PerKey.withHotKeyFanout or
>> Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
>> however. So, I solved my problem with these stages before and after the
>> join operation, effectively joining K:Iterable<V1> with K:V2:
>>
>> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,
>> V1>create())
>>
>> Join.someJoin(kvIterable1, kv2)
>>         .apply(Values.create())
>>         .apply("undo hot key GBK",
>>             ParDo
>>                 .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() {
>>                   @ProcessElement
>>                   public void fanout(ProcessContext context) {
>>                     for (V1 v1 : context.element().getKey()) {
>>                       context.output(KV.of(v1,
>> context.element().getValue()));
>>                     }
>>                   }
>>                 }))
>>
>> Does that look sane to people who have been working with Beam for a long
>> time? It has worked well for us over the last two months or so.
>>
>> (2 of 2)
>>
>> Lately, the size of the value has grown too large. It took some effort to
>> figure out the problem, which manifested as an
>> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
>> Here's the follow-up solution, only changing the first half of the above
>> solution:
>>
>> kvIterable1 = kv1
>>         .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
>>         .apply("partition grouped values",
>>             ParDo
>>                 .of(new DoFn<KV<K, Iterable<V1>>, KV<K, Iterable<V1>>>() {
>>                   @ProcessElement
>>                   public void partition(ProcessContext context) {
>>                     K k = context.element().getKey();
>>                     Iterable<V1> v1Iterable =
>> context.element().getValue();
>>                     for (List<V1> partition :
>>                         Iterables.partition(v1Iterable, 1000000)) {
>>                       context.output(KV.<K, Iterable<V1>>of(k,
>> partition));
>>                     }
>>                   }
>>                 }));
>>
>> Again, is this sane? Initial testing suggests this is a good solution.
>>
>> Jacob
>>
>
>

Re: working with hot keys

Posted by Lukasz Cwik <lc...@google.com>.
The optimization that you have done is that you have forced the V1 iterable
to reside in memory completely since it is now counted as a single element.
This will fall apart as soon your V1 iterable exceeds memory.
Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing for
the GBK/CoGBK result to exceed the size of memory and this currently only
functions at the first level within the value iterable, meaning that the
entire Iterable<V1> is treated as a single value in your Join.someJoin. You
should see similar performance if you take all the V1s out of the CoGBK and
"copy" it into an arraylist inside your DoFn and then walk the V2 iterable
and the in memory array list performing the outer join. It will also likely
be easier to reason about. Note that Dataflow doesn't do this in a great
way and causes the re-iteration to happen many more times then it should
need to which is why your perf numbers are ballooning.

Alternatively, have you tried putting either of the PCollection<K, V (V1 or
V2)> into a multimap side input and then just doing a GBK on the other
PCollection<K, V (V2 or V1)> followed by a DoFn that joins the two together
with the multimap side input?
The choice of whether V1 or V2 works better in the side input depends on
the sizes of the relative PCollections and whether the working set of the
PCollection can be cached in memory (good for side input) or the GBK
PCollection is sparse enough that if everything is cache miss it won't
matter.


On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble <jm...@kochava.com> wrote:

> When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
> PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
> It can bring processing time from hours to days.
>
> Reading this blog post
> <https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind> I
> can see some thought has already been given to this problem:
> "To address this, we allow you to provide extra parallelism hints using
> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout. These
> operations will create an extra step in your pipeline to pre-aggregate the
> data on many machines before performing the final aggregation on the target
> machines."
>
> (1 of 2)
>
> These two solutions, Combine.PerKey.withHotKeyFanout or
> Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
> however. So, I solved my problem with these stages before and after the
> join operation, effectively joining K:Iterable<V1> with K:V2:
>
> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,
> V1>create())
>
> Join.someJoin(kvIterable1, kv2)
>         .apply(Values.create())
>         .apply("undo hot key GBK",
>             ParDo
>                 .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() {
>                   @ProcessElement
>                   public void fanout(ProcessContext context) {
>                     for (V1 v1 : context.element().getKey()) {
>                       context.output(KV.of(v1,
> context.element().getValue()));
>                     }
>                   }
>                 }))
>
> Does that look sane to people who have been working with Beam for a long
> time? It has worked well for us over the last two months or so.
>
> (2 of 2)
>
> Lately, the size of the value has grown too large. It took some effort to
> figure out the problem, which manifested as an
> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
> Here's the follow-up solution, only changing the first half of the above
> solution:
>
> kvIterable1 = kv1
>         .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
>         .apply("partition grouped values",
>             ParDo
>                 .of(new DoFn<KV<K, Iterable<V1>>, KV<K, Iterable<V1>>>() {
>                   @ProcessElement
>                   public void partition(ProcessContext context) {
>                     K k = context.element().getKey();
>                     Iterable<V1> v1Iterable = context.element().getValue();
>                     for (List<V1> partition :
>                         Iterables.partition(v1Iterable, 1000000)) {
>                       context.output(KV.<K, Iterable<V1>>of(k, partition));
>                     }
>                   }
>                 }));
>
> Again, is this sane? Initial testing suggests this is a good solution.
>
> Jacob
>