You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/22 17:26:00 UTC

[jira] [Commented] (BEAM-13379) Incrementing a counter from a Python subthread doesn't seem to do anything

    [ https://issues.apache.org/jira/browse/BEAM-13379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510776#comment-17510776 ] 

Beam JIRA Bot commented on BEAM-13379:
--------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> Incrementing a counter from a Python subthread doesn't seem to do anything
> --------------------------------------------------------------------------
>
>                 Key: BEAM-13379
>                 URL: https://issues.apache.org/jira/browse/BEAM-13379
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>    Affects Versions: 2.34.0
>         Environment: Debian 5.10.46-5rodete1 (2021-09-28) x86_64 GNU/Linux
>            Reporter: Alistair Muldal
>            Priority: P2
>              Labels: stale-P2
>
> For example:
> {code:python}
> from concurrent import futures
> import threading
> from absl import app
> from absl import logging
> import apache_beam as beam
> NAMESPACE = 'METRICS_THREADS_REPRO'
> main_thread_counter = beam.metrics.Metrics.counter(
>     NAMESPACE, 'main_thread_counter')
> sub_thread_counter = beam.metrics.Metrics.counter(
>     NAMESPACE, 'sub_thread_counter')
> def increment_counter(counter):
>   counter.inc()
>   logging.info('Incremented counter %s from thread %s',
>                counter.metric_name, threading.current_thread().name)
> class IncrementCountersFn(beam.DoFn):
>   def setup(self):
>     self.executor = futures.ThreadPoolExecutor()
>   def process(self, idx):
>     increment_counter(main_thread_counter)
>     self.executor.submit(increment_counter, sub_thread_counter).result()
>     logging.info('Processed %i', idx)
> def main(argv):
>   if len(argv) > 1:
>     raise app.UsageError('Too many command-line arguments.')
>   p = beam.Pipeline()
>   _ = (
>       p
>       | 'Create' >> beam.Create(range(100))
>       | 'Process' >> beam.ParDo(IncrementCountersFn()))
>   result = p.run()
>   result.wait_until_finish()
>   filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
>   filtered_metrics = result.metrics().query(filter_by_namespace)
>   logging.info('Pipeline finished, metrics logged: %s', filtered_metrics)
> if __name__ == '__main__':
>   app.run(main)
> {code}
> Only {{main_thread_counter}} is incremented, not {{sub_thread_counter}}:
> {noformat}
> I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished, metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process, metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
> counter), labels={}), committed=100, attempted=100)], 'distributions': [], 'gauges': []}
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)