You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Alistair Muldal (Jira)" <ji...@apache.org> on 2021/12/03 18:49:00 UTC
[jira] [Created] (BEAM-13379) Incrementing a counter from a Python subthread doesn't seem to do anything
Alistair Muldal created BEAM-13379:
--------------------------------------
Summary: Incrementing a counter from a Python subthread doesn't seem to do anything
Key: BEAM-13379
URL: https://issues.apache.org/jira/browse/BEAM-13379
Project: Beam
Issue Type: Bug
Components: beam-model
Affects Versions: 2.34.0
Environment: Debian 5.10.46-5rodete1 (2021-09-28) x86_64 GNU/Linux
Reporter: Alistair Muldal
For example:
{code:python}
from concurrent import futures
import threading
from absl import app
from absl import logging
import apache_beam as beam
NAMESPACE = 'METRICS_THREADS_REPRO'
main_thread_counter = beam.metrics.Metrics.counter(
NAMESPACE, 'main_thread_counter')
sub_thread_counter = beam.metrics.Metrics.counter(
NAMESPACE, 'sub_thread_counter')
def increment_counter(counter):
counter.inc()
logging.info('Incremented counter %s from thread %s',
counter.metric_name, threading.current_thread().name)
class IncrementCountersFn(beam.DoFn):
def setup(self):
self.executor = futures.ThreadPoolExecutor()
def process(self, idx):
increment_counter(main_thread_counter)
self.executor.submit(increment_counter, sub_thread_counter).result()
logging.info('Processed %i', idx)
def main(argv):
if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.')
p = beam.Pipeline()
_ = (
p
| 'Create' >> beam.Create(range(100))
| 'Process' >> beam.ParDo(IncrementCountersFn()))
result = p.run()
result.wait_until_finish()
filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
filtered_metrics = result.metrics().query(filter_by_namespace)
logging.info('Pipeline finished, metrics logged: %s', filtered_metrics)
if __name__ == '__main__':
app.run(main)
{code}
Only {{main_thread_counter}} is incremented, not {{sub_thread_counter}}:
{noformat}
I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished, metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process, metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
counter), labels={}), committed=100, attempted=100)], 'distributions': [], 'gauges': []}
{noformat}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)