You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/22 17:26:00 UTC
[jira] [Updated] (BEAM-13379) Incrementing a counter from a Python subthread doesn't seem to do anything
[ https://issues.apache.org/jira/browse/BEAM-13379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot updated BEAM-13379:
---------------------------------
Labels: stale-P2 (was: )
> Incrementing a counter from a Python subthread doesn't seem to do anything
> --------------------------------------------------------------------------
>
> Key: BEAM-13379
> URL: https://issues.apache.org/jira/browse/BEAM-13379
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Affects Versions: 2.34.0
> Environment: Debian 5.10.46-5rodete1 (2021-09-28) x86_64 GNU/Linux
> Reporter: Alistair Muldal
> Priority: P2
> Labels: stale-P2
>
> For example:
> {code:python}
> from concurrent import futures
> import threading
> from absl import app
> from absl import logging
> import apache_beam as beam
> NAMESPACE = 'METRICS_THREADS_REPRO'
> main_thread_counter = beam.metrics.Metrics.counter(
> NAMESPACE, 'main_thread_counter')
> sub_thread_counter = beam.metrics.Metrics.counter(
> NAMESPACE, 'sub_thread_counter')
> def increment_counter(counter):
> counter.inc()
> logging.info('Incremented counter %s from thread %s',
> counter.metric_name, threading.current_thread().name)
> class IncrementCountersFn(beam.DoFn):
> def setup(self):
> self.executor = futures.ThreadPoolExecutor()
> def process(self, idx):
> increment_counter(main_thread_counter)
> self.executor.submit(increment_counter, sub_thread_counter).result()
> logging.info('Processed %i', idx)
> def main(argv):
> if len(argv) > 1:
> raise app.UsageError('Too many command-line arguments.')
> p = beam.Pipeline()
> _ = (
> p
> | 'Create' >> beam.Create(range(100))
> | 'Process' >> beam.ParDo(IncrementCountersFn()))
> result = p.run()
> result.wait_until_finish()
> filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
> filtered_metrics = result.metrics().query(filter_by_namespace)
> logging.info('Pipeline finished, metrics logged: %s', filtered_metrics)
> if __name__ == '__main__':
> app.run(main)
> {code}
> Only {{main_thread_counter}} is incremented, not {{sub_thread_counter}}:
> {noformat}
> I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished, metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process, metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
> counter), labels={}), committed=100, attempted=100)], 'distributions': [], 'gauges': []}
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)