You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Udi Meiri <eh...@google.com> on 2018/11/08 18:52:14 UTC

BEAM-6018: memory leak in thread pool instantiation

HI,
I've identified a memory leak when GcsUtil.java instantiates a
ThreadPoolExecutor (https://issues.apache.org/jira/browse/BEAM-6018).
The code uses the getExitingExecutorService
<https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L551>
wrapper,
which leaks memory. The question is, why is that wrapper necessary
if executor.shutdown(); is later unconditionally called?

Re: BEAM-6018: memory leak in thread pool instantiation

Posted by Lukasz Cwik <lc...@google.com>.
Thanks for the context Dan, that was helpful.

On Fri, Nov 9, 2018 at 10:09 AM Udi Meiri <eh...@google.com> wrote:

> The reasoning unbounded threadpool is explained as:
> /* The SDK requires an unbounded thread pool because a step may create X
> writers
> * each requiring their own thread to perform the writes otherwise a writer
> may
> * block causing deadlock for the step because the writers buffer is full.
> * Also, the MapTaskExecutor launches the steps in reverse order and
> completes
> * them in forward order thus requiring enough threads so that each step's
> writers
> * can be active.
>
> */
>
>
> https://github.com/apache/beam/blob/17c2da6d981cae9f233aea1e2d6d64259362dd73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java#L133-L138
>
> On Thu, Nov 8, 2018 at 11:41 PM Dan Halperin <dh...@apache.org> wrote:
>
>>
>>> On Thu, Nov 8, 2018 at 2:12 PM Udi Meiri <eh...@google.com> wrote:
>>>
>>>> Both options risk delaying worker shutdown if the executor's shutdown()
>>>> hasn't been called, which is I guess why the executor in GcsOptions.java
>>>> creates daemon threads.
>>>>
>>>
>> My guess (and it really is a guess at this point) is that this was a fix
>> for DirectRunner issues - want that to exit quickly!
>>
>>
>>>
>>>> On Thu, Nov 8, 2018 at 1:02 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Not certain, it looks like we should have been caching the executor
>>>>> within the GcsUtil as a static instance instead of creating one each time.
>>>>> Could have been missed during code review / slow code changes over time.
>>>>> GcsUtil is not well "loved".
>>>>>
>>>>> On Thu, Nov 8, 2018 at 11:00 AM Udi Meiri <eh...@google.com> wrote:
>>>>>
>>>>>> HI,
>>>>>> I've identified a memory leak when GcsUtil.java instantiates a
>>>>>> ThreadPoolExecutor (https://issues.apache.org/jira/browse/BEAM-6018).
>>>>>> The code uses the getExitingExecutorService
>>>>>> <https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L551> wrapper,
>>>>>> which leaks memory. The question is, why is that wrapper necessary
>>>>>> if executor.shutdown(); is later unconditionally called?
>>>>>>
>>>>>

Re: BEAM-6018: memory leak in thread pool instantiation

Posted by Udi Meiri <eh...@google.com>.
The reasoning unbounded threadpool is explained as:
/* The SDK requires an unbounded thread pool because a step may create X
writers
* each requiring their own thread to perform the writes otherwise a writer
may
* block causing deadlock for the step because the writers buffer is full.
* Also, the MapTaskExecutor launches the steps in reverse order and
completes
* them in forward order thus requiring enough threads so that each step's
writers
* can be active.

*/

https://github.com/apache/beam/blob/17c2da6d981cae9f233aea1e2d6d64259362dd73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java#L133-L138

On Thu, Nov 8, 2018 at 11:41 PM Dan Halperin <dh...@apache.org> wrote:

>
>> On Thu, Nov 8, 2018 at 2:12 PM Udi Meiri <eh...@google.com> wrote:
>>
>>> Both options risk delaying worker shutdown if the executor's shutdown()
>>> hasn't been called, which is I guess why the executor in GcsOptions.java
>>> creates daemon threads.
>>>
>>
> My guess (and it really is a guess at this point) is that this was a fix
> for DirectRunner issues - want that to exit quickly!
>
>
>>
>>> On Thu, Nov 8, 2018 at 1:02 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Not certain, it looks like we should have been caching the executor
>>>> within the GcsUtil as a static instance instead of creating one each time.
>>>> Could have been missed during code review / slow code changes over time.
>>>> GcsUtil is not well "loved".
>>>>
>>>> On Thu, Nov 8, 2018 at 11:00 AM Udi Meiri <eh...@google.com> wrote:
>>>>
>>>>> HI,
>>>>> I've identified a memory leak when GcsUtil.java instantiates a
>>>>> ThreadPoolExecutor (https://issues.apache.org/jira/browse/BEAM-6018).
>>>>> The code uses the getExitingExecutorService
>>>>> <https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L551> wrapper,
>>>>> which leaks memory. The question is, why is that wrapper necessary
>>>>> if executor.shutdown(); is later unconditionally called?
>>>>>
>>>>

Re: BEAM-6018: memory leak in thread pool instantiation

Posted by Dan Halperin <dh...@apache.org>.
>
>
> On Thu, Nov 8, 2018 at 2:12 PM Udi Meiri <eh...@google.com> wrote:
>
>> Both options risk delaying worker shutdown if the executor's shutdown()
>> hasn't been called, which is I guess why the executor in GcsOptions.java
>> creates daemon threads.
>>
>
My guess (and it really is a guess at this point) is that this was a fix
for DirectRunner issues - want that to exit quickly!


>
>> On Thu, Nov 8, 2018 at 1:02 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Not certain, it looks like we should have been caching the executor
>>> within the GcsUtil as a static instance instead of creating one each time.
>>> Could have been missed during code review / slow code changes over time.
>>> GcsUtil is not well "loved".
>>>
>>> On Thu, Nov 8, 2018 at 11:00 AM Udi Meiri <eh...@google.com> wrote:
>>>
>>>> HI,
>>>> I've identified a memory leak when GcsUtil.java instantiates a
>>>> ThreadPoolExecutor (https://issues.apache.org/jira/browse/BEAM-6018).
>>>> The code uses the getExitingExecutorService
>>>> <https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L551> wrapper,
>>>> which leaks memory. The question is, why is that wrapper necessary
>>>> if executor.shutdown(); is later unconditionally called?
>>>>
>>>

Re: BEAM-6018: memory leak in thread pool instantiation

Posted by Dan Halperin <dh...@apache.org>.
Hey Udi,

Thanks for the commit comment
<https://github.com/apache/beam/commit/f08f21cdf4c067745a10b31a6481ed470f97dadc#r31216133>.
I'll try to dump any (old) mental context I have left..

We were trying to find the right point in a space of:

* enough parallelism to speed things up
    - more than 256 didn't seem to help with perf, and 256 was not close to
looking like a DDOS to GCS, so we were not in danger of quota limits being
imposed.
* enough batches to speed things up
    - batches -> fewer RPCs from the job to GCS, more RPCs inside GCS. 100
was again good for perf and good for quotas.
* small enough batches to spread the load
    - if you think about the multiple layers of fanout in RPC handling,
sending a batch RPC cuts out the first layer of load-balancing. The
endpoint that receives the batch itself handles all the individual
requests, and if that endpoint is slow or any individual request in the
batch is slow, the entire batch is slow. Prefer many batches in flight, so
as to not be limited by the performance of that single endpoint.

Coming to the question in the PR comment:
> Any reason not to use this.executorService?

I think the main reason to not use `this.executor` is that we didn't have
constraints on that executor in terms of either upper or lower bounds on
parallelism, so it seemed safer to use our own with known limits.

Thanks for catching the memory leak though - we didn't at the time! I'll
defer to you (and especially Luke ;) on a good solution to fix.
Dan

On Thu, Nov 8, 2018 at 2:12 PM Udi Meiri <eh...@google.com> wrote:

> My thought was to use 1 executor per GcsUtil instance (or 1 per process as
> you suggest), with a possible performance hit since I don't know how often
> these batch copy and remove operations are called.
> The other option is to leave things as they mostly are, and only remove
> the call to getExitingExecutorService.
>
> Both options risk delaying worker shutdown if the executor's shutdown()
> hasn't been called, which is I guess why the executor in GcsOptions.java
> creates daemon threads.
>
> On Thu, Nov 8, 2018 at 1:02 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Not certain, it looks like we should have been caching the executor
>> within the GcsUtil as a static instance instead of creating one each time.
>> Could have been missed during code review / slow code changes over time.
>> GcsUtil is not well "loved".
>>
>> On Thu, Nov 8, 2018 at 11:00 AM Udi Meiri <eh...@google.com> wrote:
>>
>>> HI,
>>> I've identified a memory leak when GcsUtil.java instantiates a
>>> ThreadPoolExecutor (https://issues.apache.org/jira/browse/BEAM-6018).
>>> The code uses the getExitingExecutorService
>>> <https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L551> wrapper,
>>> which leaks memory. The question is, why is that wrapper necessary
>>> if executor.shutdown(); is later unconditionally called?
>>>
>>

Re: BEAM-6018: memory leak in thread pool instantiation

Posted by Udi Meiri <eh...@google.com>.
My thought was to use 1 executor per GcsUtil instance (or 1 per process as
you suggest), with a possible performance hit since I don't know how often
these batch copy and remove operations are called.
The other option is to leave things as they mostly are, and only remove the
call to getExitingExecutorService.

Both options risk delaying worker shutdown if the executor's shutdown()
hasn't been called, which is I guess why the executor in GcsOptions.java
creates daemon threads.

On Thu, Nov 8, 2018 at 1:02 PM Lukasz Cwik <lc...@google.com> wrote:

> Not certain, it looks like we should have been caching the executor within
> the GcsUtil as a static instance instead of creating one each time. Could
> have been missed during code review / slow code changes over time. GcsUtil
> is not well "loved".
>
> On Thu, Nov 8, 2018 at 11:00 AM Udi Meiri <eh...@google.com> wrote:
>
>> HI,
>> I've identified a memory leak when GcsUtil.java instantiates a
>> ThreadPoolExecutor (https://issues.apache.org/jira/browse/BEAM-6018).
>> The code uses the getExitingExecutorService
>> <https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L551> wrapper,
>> which leaks memory. The question is, why is that wrapper necessary
>> if executor.shutdown(); is later unconditionally called?
>>
>

Re: BEAM-6018: memory leak in thread pool instantiation

Posted by Lukasz Cwik <lc...@google.com>.
Not certain, it looks like we should have been caching the executor within
the GcsUtil as a static instance instead of creating one each time. Could
have been missed during code review / slow code changes over time. GcsUtil
is not well "loved".

On Thu, Nov 8, 2018 at 11:00 AM Udi Meiri <eh...@google.com> wrote:

> HI,
> I've identified a memory leak when GcsUtil.java instantiates a
> ThreadPoolExecutor (https://issues.apache.org/jira/browse/BEAM-6018).
> The code uses the getExitingExecutorService
> <https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L551> wrapper,
> which leaks memory. The question is, why is that wrapper necessary
> if executor.shutdown(); is later unconditionally called?
>