You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Yixing Zhang <to...@gmail.com> on 2020/01/17 19:29:16 UTC

Updating Metrics Counter in user defined thread

Hi Beam Committers,

I am a developer on Beam Samza runner. Currently, we are seeing some issues
where our users failed to update Metrics in their thread. I am wondering if
anyone has suggestions on this issue.

Problem:
MetricsContainer
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java>
is
ThreadLocal in MetricsEnvironment
<https://github.com/apache/beam/commits/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java>.
Whenever DelegatingCounter
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java>.inc()
is called. It tries to find the MetricsContainer in the current thread and
update the corresponding CounterCell. For Samza runner, we have a
FnWithMetricsWrapper
<https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java>
to
set the MetricsContainer for the current thread before each DoFn is run.
However, if users define their own threads inside a Pardo function and try
to update the Metrics in their threads, they will fail to update the
Metrics and get error log "Unable to update metrics on the current
thread...."
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java#L119>
.

Example:

pipeline
    .apply(Create.of(inputData))
    .apply(ParDo.of(new DoFn<KV<String, String>, Void>() {
  @ProcessElement
  public void processElement(ProcessContext context) {
    Metrics.counter("test", "counter1").inc();
    Thread thread = new Thread(() -> {
      Metrics.counter("test", "counter2").inc();
    }, "a user-defined thread");
    thread.start();
  }
}));

In this case, counter1 can be updated but counter2 cannot be updated
because MetricsContainer has not been set in their thread.

We don't have any control of user-defined threads. So, it seems impossible
for our runner to set the MetricsContainer for their threads. Can someone
give me some suggestions either from developer's perspective or from user's
perspective about how to make this use case work?

Thanks,
Yixing

Re: Updating Metrics Counter in user defined thread

Posted by Luke Cwik <lc...@google.com>.
I think any approach where we allow asynchronous processing in another
thread needs a holistic approach beyond counters since many things that the
thread may want to have access to (user state, side inputs, counters,
producing output) was intentionally setup to not be thread safe due to the
cost of synchronization so just getting counters working will only
alleviate the issues your users face until they hit the next roadblock.

Have you considered having Samza split the work into smaller bundles which
it then uses multiple threads to process?

On Tue, Jan 21, 2020 at 5:25 PM Robert Bradshaw <ro...@google.com> wrote:

> I'm curious in this setup how one attributes the reporting of counters
> on bundle completion. (Often there is more than one bundle running
> concurrently in a VM.)
>
> Another idea--would it be possible (this would require some change to
> user code, but not much) to pass a Counter object that's already bound
> to container to the incrementing code. A bind() call (or similar)
> would be called when creating the thread (or the object that
> subsequently creates the thread) which would then be used. There are
> still lifetime issues to deal with...
>
> On Tue, Jan 21, 2020 at 4:24 PM Yixing Zhang <to...@gmail.com>
> wrote:
> >
> > Thanks for the suggestions, Robert and Alex! Although the suggested
> solutions may work, all of them require users to change their code, which
> is not ideal. Creating new threads and updating metrics in callback
> functions are pretty common for samza-runner users. We'd like to come up
> with a way that we can make some changes in the runner side to enable this
> use case.
> >
> > We'd like to propose a short-term solution for this use case. Here are
> the steps:
> > 1. In MetricsEnvironment class, add a static method "public static void
> setGlobalContainer(@Nullable MetricsContainer container)" that can be
> called by runners to set a global metrics container per JVM.
> > 2. In MetricsEnvironment class, modify the getCurrentContainer() method
> to return global container when the thread-local container is null and the
> global container is not null.
> > 3. For each runner, developers can add
> MetricsEnvironment.setGloabalContainer() to the runner code to enable this
> feature. There will be no side effect if
> MetricsEnvironment.setGloabalContainer() is not called.
> >
> > In this way, it allows beam runners to decide whether to support
> updating metrics in user thread or not. If it's not supported, nothing will
> be changed. If it's supported, users don't need to change their application
> code to enable it. One drawback of this short-term solution is that the
> step name is missing for the global container. However, in this case, users
> are updating the metrics in their threads, they may not care too much about
> the step names. They can find the metrics with just metric names.
> >
> > Please give me some feedbacks on this proposal and let's discuss whether
> to implement this.
> >
> > Thanks,
> > Yixing
> >
> > On Fri, Jan 17, 2020 at 1:37 PM Alex Amato <aj...@google.com> wrote:
> >>
> >> The one work around I can suggest, is if its at all possible to
> parallelize the work by keying the data. This requires modifying the
> pipeline. I.e. the first ParDo produces elements for different keys. Then
> follow that with a GBK. Then the downstream pardo will have a thread for
> every key. IF those threads all compute something and you need to combine
> those results in a single place, you may need to produce elements which
> again rekey the data, follow that with another GBK and a combiner.
> >>
> >> Something sort of like this, if I understand correctly.
> >>
> >> ParDo
> >> GBK
> >> ParDo
> >> GBK
> >> Combiner
> >>
> >> But this work around may not work for all problems necessarily. And if
> the metrics are designed to be aggregated within a single UDF, ParDo or
> Combiner. So if you needed the counters to be aggregated across all of
> these operations as well, then this may not work.
> >>
> >> The backed in assumption of using the thread local setup is that
> parallelism is typically handled by the Beam, rather than introducing a
> separate threading model. Though, perhaps breaking out of this threading
> model is more common than we initially thought.
> >>
> >> I hope thats helpful, sorry we don't have an easy fix.
> >>
> >> On Fri, Jan 17, 2020 at 11:39 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>>
> >>> Yes, this is an issue with how counters are implemented, and there's
> >>> no good workaround. (We could use inheritable thread locals in Java,
> >>> but that assumes the lifetime of the thread does not outlive the
> >>> lifetime of the DoFn, and would probably work poorly with
> >>> threadpools). In the meantime, one can update (say) a Map in the
> >>> spawned threads and let the main thread in processElement (and likely
> >>> finishBundle) increment the metrics in a threadsafe way based on the
> >>> contents of the map.
> >>>
> >>> On Fri, Jan 17, 2020 at 11:29 AM Yixing Zhang <to...@gmail.com>
> wrote:
> >>> >
> >>> > Hi Beam Committers,
> >>> >
> >>> > I am a developer on Beam Samza runner. Currently, we are seeing some
> issues where our users failed to update Metrics in their thread. I am
> wondering if anyone has suggestions on this issue.
> >>> >
> >>> > Problem:
> >>> > MetricsContainer is ThreadLocal in MetricsEnvironment. Whenever
> DelegatingCounter.inc() is called. It tries to find the MetricsContainer in
> the current thread and update the corresponding CounterCell. For Samza
> runner, we have a FnWithMetricsWrapper to set the MetricsContainer for the
> current thread before each DoFn is run. However, if users define their own
> threads inside a Pardo function and try to update the Metrics in their
> threads, they will fail to update the Metrics and get error log "Unable to
> update metrics on the current thread....".
> >>> >
> >>> > Example:
> >>> >
> >>> > pipeline
> >>> >     .apply(Create.of(inputData))
> >>> >     .apply(ParDo.of(new DoFn<KV<String, String>, Void>() {
> >>> >   @ProcessElement
> >>> >   public void processElement(ProcessContext context) {
> >>> >     Metrics.counter("test", "counter1").inc();
> >>> >     Thread thread = new Thread(() -> {
> >>> >       Metrics.counter("test", "counter2").inc();
> >>> >     }, "a user-defined thread");
> >>> >     thread.start();
> >>> >   }
> >>> > }));
> >>> >
> >>> > In this case, counter1 can be updated but counter2 cannot be updated
> because MetricsContainer has not been set in their thread.
> >>> >
> >>> > We don't have any control of user-defined threads. So, it seems
> impossible for our runner to set the MetricsContainer for their threads.
> Can someone give me some suggestions either from developer's perspective or
> from user's perspective about how to make this use case work?
> >>> >
> >>> > Thanks,
> >>> > Yixing
> >>> >
>

Re: Updating Metrics Counter in user defined thread

Posted by Robert Bradshaw <ro...@google.com>.
I'm curious in this setup how one attributes the reporting of counters
on bundle completion. (Often there is more than one bundle running
concurrently in a VM.)

Another idea--would it be possible (this would require some change to
user code, but not much) to pass a Counter object that's already bound
to container to the incrementing code. A bind() call (or similar)
would be called when creating the thread (or the object that
subsequently creates the thread) which would then be used. There are
still lifetime issues to deal with...

On Tue, Jan 21, 2020 at 4:24 PM Yixing Zhang <to...@gmail.com> wrote:
>
> Thanks for the suggestions, Robert and Alex! Although the suggested solutions may work, all of them require users to change their code, which is not ideal. Creating new threads and updating metrics in callback functions are pretty common for samza-runner users. We'd like to come up with a way that we can make some changes in the runner side to enable this use case.
>
> We'd like to propose a short-term solution for this use case. Here are the steps:
> 1. In MetricsEnvironment class, add a static method "public static void setGlobalContainer(@Nullable MetricsContainer container)" that can be called by runners to set a global metrics container per JVM.
> 2. In MetricsEnvironment class, modify the getCurrentContainer() method to return global container when the thread-local container is null and the global container is not null.
> 3. For each runner, developers can add MetricsEnvironment.setGloabalContainer() to the runner code to enable this feature. There will be no side effect if MetricsEnvironment.setGloabalContainer() is not called.
>
> In this way, it allows beam runners to decide whether to support updating metrics in user thread or not. If it's not supported, nothing will be changed. If it's supported, users don't need to change their application code to enable it. One drawback of this short-term solution is that the step name is missing for the global container. However, in this case, users are updating the metrics in their threads, they may not care too much about the step names. They can find the metrics with just metric names.
>
> Please give me some feedbacks on this proposal and let's discuss whether to implement this.
>
> Thanks,
> Yixing
>
> On Fri, Jan 17, 2020 at 1:37 PM Alex Amato <aj...@google.com> wrote:
>>
>> The one work around I can suggest, is if its at all possible to parallelize the work by keying the data. This requires modifying the pipeline. I.e. the first ParDo produces elements for different keys. Then follow that with a GBK. Then the downstream pardo will have a thread for every key. IF those threads all compute something and you need to combine those results in a single place, you may need to produce elements which again rekey the data, follow that with another GBK and a combiner.
>>
>> Something sort of like this, if I understand correctly.
>>
>> ParDo
>> GBK
>> ParDo
>> GBK
>> Combiner
>>
>> But this work around may not work for all problems necessarily. And if the metrics are designed to be aggregated within a single UDF, ParDo or Combiner. So if you needed the counters to be aggregated across all of these operations as well, then this may not work.
>>
>> The backed in assumption of using the thread local setup is that parallelism is typically handled by the Beam, rather than introducing a separate threading model. Though, perhaps breaking out of this threading model is more common than we initially thought.
>>
>> I hope thats helpful, sorry we don't have an easy fix.
>>
>> On Fri, Jan 17, 2020 at 11:39 AM Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> Yes, this is an issue with how counters are implemented, and there's
>>> no good workaround. (We could use inheritable thread locals in Java,
>>> but that assumes the lifetime of the thread does not outlive the
>>> lifetime of the DoFn, and would probably work poorly with
>>> threadpools). In the meantime, one can update (say) a Map in the
>>> spawned threads and let the main thread in processElement (and likely
>>> finishBundle) increment the metrics in a threadsafe way based on the
>>> contents of the map.
>>>
>>> On Fri, Jan 17, 2020 at 11:29 AM Yixing Zhang <to...@gmail.com> wrote:
>>> >
>>> > Hi Beam Committers,
>>> >
>>> > I am a developer on Beam Samza runner. Currently, we are seeing some issues where our users failed to update Metrics in their thread. I am wondering if anyone has suggestions on this issue.
>>> >
>>> > Problem:
>>> > MetricsContainer is ThreadLocal in MetricsEnvironment. Whenever DelegatingCounter.inc() is called. It tries to find the MetricsContainer in the current thread and update the corresponding CounterCell. For Samza runner, we have a FnWithMetricsWrapper to set the MetricsContainer for the current thread before each DoFn is run. However, if users define their own threads inside a Pardo function and try to update the Metrics in their threads, they will fail to update the Metrics and get error log "Unable to update metrics on the current thread....".
>>> >
>>> > Example:
>>> >
>>> > pipeline
>>> >     .apply(Create.of(inputData))
>>> >     .apply(ParDo.of(new DoFn<KV<String, String>, Void>() {
>>> >   @ProcessElement
>>> >   public void processElement(ProcessContext context) {
>>> >     Metrics.counter("test", "counter1").inc();
>>> >     Thread thread = new Thread(() -> {
>>> >       Metrics.counter("test", "counter2").inc();
>>> >     }, "a user-defined thread");
>>> >     thread.start();
>>> >   }
>>> > }));
>>> >
>>> > In this case, counter1 can be updated but counter2 cannot be updated because MetricsContainer has not been set in their thread.
>>> >
>>> > We don't have any control of user-defined threads. So, it seems impossible for our runner to set the MetricsContainer for their threads. Can someone give me some suggestions either from developer's perspective or from user's perspective about how to make this use case work?
>>> >
>>> > Thanks,
>>> > Yixing
>>> >

Re: Updating Metrics Counter in user defined thread

Posted by Yixing Zhang <to...@gmail.com>.
Thanks for the suggestions, Robert and Alex! Although the suggested
solutions may work, all of them require users to change their code,
which is not ideal. Creating new threads and updating metrics in callback
functions are pretty common for samza-runner users. We'd like to come up
with a way that we can make some changes in the runner side to enable this
use case.

We'd like to propose a short-term solution for this use case. Here are the
steps:
1. In MetricsEnvironment
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java>
class,
add a static method "public static void setGlobalContainer(@Nullable
MetricsContainer container)" that can be called by runners to set a global
metrics container per JVM.
2. In MetricsEnvironment class, modify the getCurrentContainer() method to
return global container when the thread-local container is null and the
global container is not null.
3. For each runner, developers can add
MetricsEnvironment.setGloabalContainer() to the runner code to enable this
feature. There will be no side effect
if MetricsEnvironment.setGloabalContainer() is not called.

In this way, it allows beam runners to decide whether to support updating
metrics in user thread or not. If it's not supported, nothing will be
changed. If it's supported, users don't need to change their application
code to enable it. One drawback of this short-term solution is that the
step name is missing for the global container. However, in this case, users
are updating the metrics in their threads, they may not care too much about
the step names. They can find the metrics with just metric names.

Please give me some feedbacks on this proposal and let's discuss whether to
implement this.

Thanks,
Yixing

On Fri, Jan 17, 2020 at 1:37 PM Alex Amato <aj...@google.com> wrote:

> The one work around I can suggest, is if its at all possible to
> parallelize the work by keying the data. This requires modifying the
> pipeline. I.e. the first ParDo produces elements for different keys. Then
> follow that with a GBK. Then the downstream pardo will have a thread for
> every key. IF those threads all compute something and you need to combine
> those results in a single place, you may need to produce elements which
> again rekey the data, follow that with another GBK and a combiner.
>
> Something sort of like this, if I understand correctly.
>
> ParDo
> GBK
> ParDo
> GBK
> Combiner
>
> But this work around may not work for all problems necessarily. And if the
> metrics are designed to be aggregated within a single UDF, ParDo or
> Combiner. So if you needed the counters to be aggregated across all of
> these operations as well, then this may not work.
>
> The backed in assumption of using the thread local setup is that
> parallelism is typically handled by the Beam, rather than introducing a
> separate threading model. Though, perhaps breaking out of this threading
> model is more common than we initially thought.
>
> I hope thats helpful, sorry we don't have an easy fix.
>
> On Fri, Jan 17, 2020 at 11:39 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Yes, this is an issue with how counters are implemented, and there's
>> no good workaround. (We could use inheritable thread locals in Java,
>> but that assumes the lifetime of the thread does not outlive the
>> lifetime of the DoFn, and would probably work poorly with
>> threadpools). In the meantime, one can update (say) a Map in the
>> spawned threads and let the main thread in processElement (and likely
>> finishBundle) increment the metrics in a threadsafe way based on the
>> contents of the map.
>>
>> On Fri, Jan 17, 2020 at 11:29 AM Yixing Zhang <to...@gmail.com>
>> wrote:
>> >
>> > Hi Beam Committers,
>> >
>> > I am a developer on Beam Samza runner. Currently, we are seeing some
>> issues where our users failed to update Metrics in their thread. I am
>> wondering if anyone has suggestions on this issue.
>> >
>> > Problem:
>> > MetricsContainer is ThreadLocal in MetricsEnvironment. Whenever
>> DelegatingCounter.inc() is called. It tries to find the MetricsContainer in
>> the current thread and update the corresponding CounterCell. For Samza
>> runner, we have a FnWithMetricsWrapper to set the MetricsContainer for the
>> current thread before each DoFn is run. However, if users define their own
>> threads inside a Pardo function and try to update the Metrics in their
>> threads, they will fail to update the Metrics and get error log "Unable to
>> update metrics on the current thread....".
>> >
>> > Example:
>> >
>> > pipeline
>> >     .apply(Create.of(inputData))
>> >     .apply(ParDo.of(new DoFn<KV<String, String>, Void>() {
>> >   @ProcessElement
>> >   public void processElement(ProcessContext context) {
>> >     Metrics.counter("test", "counter1").inc();
>> >     Thread thread = new Thread(() -> {
>> >       Metrics.counter("test", "counter2").inc();
>> >     }, "a user-defined thread");
>> >     thread.start();
>> >   }
>> > }));
>> >
>> > In this case, counter1 can be updated but counter2 cannot be updated
>> because MetricsContainer has not been set in their thread.
>> >
>> > We don't have any control of user-defined threads. So, it seems
>> impossible for our runner to set the MetricsContainer for their threads.
>> Can someone give me some suggestions either from developer's perspective or
>> from user's perspective about how to make this use case work?
>> >
>> > Thanks,
>> > Yixing
>> >
>>
>

Re: Updating Metrics Counter in user defined thread

Posted by Alex Amato <aj...@google.com>.
The one work around I can suggest, is if its at all possible to parallelize
the work by keying the data. This requires modifying the pipeline. I.e. the
first ParDo produces elements for different keys. Then follow that with a
GBK. Then the downstream pardo will have a thread for every key. IF those
threads all compute something and you need to combine those results in a
single place, you may need to produce elements which again rekey the data,
follow that with another GBK and a combiner.

Something sort of like this, if I understand correctly.

ParDo
GBK
ParDo
GBK
Combiner

But this work around may not work for all problems necessarily. And if the
metrics are designed to be aggregated within a single UDF, ParDo or
Combiner. So if you needed the counters to be aggregated across all of
these operations as well, then this may not work.

The backed in assumption of using the thread local setup is that
parallelism is typically handled by the Beam, rather than introducing a
separate threading model. Though, perhaps breaking out of this threading
model is more common than we initially thought.

I hope thats helpful, sorry we don't have an easy fix.

On Fri, Jan 17, 2020 at 11:39 AM Robert Bradshaw <ro...@google.com>
wrote:

> Yes, this is an issue with how counters are implemented, and there's
> no good workaround. (We could use inheritable thread locals in Java,
> but that assumes the lifetime of the thread does not outlive the
> lifetime of the DoFn, and would probably work poorly with
> threadpools). In the meantime, one can update (say) a Map in the
> spawned threads and let the main thread in processElement (and likely
> finishBundle) increment the metrics in a threadsafe way based on the
> contents of the map.
>
> On Fri, Jan 17, 2020 at 11:29 AM Yixing Zhang <to...@gmail.com>
> wrote:
> >
> > Hi Beam Committers,
> >
> > I am a developer on Beam Samza runner. Currently, we are seeing some
> issues where our users failed to update Metrics in their thread. I am
> wondering if anyone has suggestions on this issue.
> >
> > Problem:
> > MetricsContainer is ThreadLocal in MetricsEnvironment. Whenever
> DelegatingCounter.inc() is called. It tries to find the MetricsContainer in
> the current thread and update the corresponding CounterCell. For Samza
> runner, we have a FnWithMetricsWrapper to set the MetricsContainer for the
> current thread before each DoFn is run. However, if users define their own
> threads inside a Pardo function and try to update the Metrics in their
> threads, they will fail to update the Metrics and get error log "Unable to
> update metrics on the current thread....".
> >
> > Example:
> >
> > pipeline
> >     .apply(Create.of(inputData))
> >     .apply(ParDo.of(new DoFn<KV<String, String>, Void>() {
> >   @ProcessElement
> >   public void processElement(ProcessContext context) {
> >     Metrics.counter("test", "counter1").inc();
> >     Thread thread = new Thread(() -> {
> >       Metrics.counter("test", "counter2").inc();
> >     }, "a user-defined thread");
> >     thread.start();
> >   }
> > }));
> >
> > In this case, counter1 can be updated but counter2 cannot be updated
> because MetricsContainer has not been set in their thread.
> >
> > We don't have any control of user-defined threads. So, it seems
> impossible for our runner to set the MetricsContainer for their threads.
> Can someone give me some suggestions either from developer's perspective or
> from user's perspective about how to make this use case work?
> >
> > Thanks,
> > Yixing
> >
>

Re: Updating Metrics Counter in user defined thread

Posted by Robert Bradshaw <ro...@google.com>.
Yes, this is an issue with how counters are implemented, and there's
no good workaround. (We could use inheritable thread locals in Java,
but that assumes the lifetime of the thread does not outlive the
lifetime of the DoFn, and would probably work poorly with
threadpools). In the meantime, one can update (say) a Map in the
spawned threads and let the main thread in processElement (and likely
finishBundle) increment the metrics in a threadsafe way based on the
contents of the map.

On Fri, Jan 17, 2020 at 11:29 AM Yixing Zhang <to...@gmail.com> wrote:
>
> Hi Beam Committers,
>
> I am a developer on Beam Samza runner. Currently, we are seeing some issues where our users failed to update Metrics in their thread. I am wondering if anyone has suggestions on this issue.
>
> Problem:
> MetricsContainer is ThreadLocal in MetricsEnvironment. Whenever DelegatingCounter.inc() is called. It tries to find the MetricsContainer in the current thread and update the corresponding CounterCell. For Samza runner, we have a FnWithMetricsWrapper to set the MetricsContainer for the current thread before each DoFn is run. However, if users define their own threads inside a Pardo function and try to update the Metrics in their threads, they will fail to update the Metrics and get error log "Unable to update metrics on the current thread....".
>
> Example:
>
> pipeline
>     .apply(Create.of(inputData))
>     .apply(ParDo.of(new DoFn<KV<String, String>, Void>() {
>   @ProcessElement
>   public void processElement(ProcessContext context) {
>     Metrics.counter("test", "counter1").inc();
>     Thread thread = new Thread(() -> {
>       Metrics.counter("test", "counter2").inc();
>     }, "a user-defined thread");
>     thread.start();
>   }
> }));
>
> In this case, counter1 can be updated but counter2 cannot be updated because MetricsContainer has not been set in their thread.
>
> We don't have any control of user-defined threads. So, it seems impossible for our runner to set the MetricsContainer for their threads. Can someone give me some suggestions either from developer's perspective or from user's perspective about how to make this use case work?
>
> Thanks,
> Yixing
>