You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 21:53:50 UTC
[GitHub] [beam] damccorm opened a new issue, #21217: Incrementing a counter from a Python subthread doesn't seem to do anything
damccorm opened a new issue, #21217:
URL: https://github.com/apache/beam/issues/21217
For example:
```
from concurrent import futures
import threading
from absl import app
from absl import logging
import
apache_beam as beam
NAMESPACE = 'METRICS_THREADS_REPRO'
main_thread_counter = beam.metrics.Metrics.counter(
NAMESPACE, 'main_thread_counter')
sub_thread_counter = beam.metrics.Metrics.counter(
NAMESPACE,
'sub_thread_counter')
def increment_counter(counter):
counter.inc()
logging.info('Incremented
counter %s from thread %s',
counter.metric_name, threading.current_thread().name)
class
IncrementCountersFn(beam.DoFn):
def setup(self):
self.executor = futures.ThreadPoolExecutor()
def process(self, idx):
increment_counter(main_thread_counter)
self.executor.submit(increment_counter,
sub_thread_counter).result()
logging.info('Processed %i', idx)
def main(argv):
if len(argv)
> 1:
raise app.UsageError('Too many command-line arguments.')
p = beam.Pipeline()
_ = (
p
| 'Create' >> beam.Create(range(100))
| 'Process' >> beam.ParDo(IncrementCountersFn()))
result = p.run()
result.wait_until_finish()
filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
filtered_metrics = result.metrics().query(filter_by_namespace)
logging.info('Pipeline finished, metrics
logged: %s', filtered_metrics)
if __name__ == '__main__':
app.run(main)
```
Only `main_thread_counter` is incremented, not `sub_thread_counter`:
```
I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished, metrics logged: {'counters':
[MetricResult(key=MetricKey(step=Process, metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
counter),
labels={}), committed=100, attempted=100)], 'distributions': [], 'gauges': []}
```
Imported from Jira [BEAM-13379](https://issues.apache.org/jira/browse/BEAM-13379). Original Jira may contain additional context.
Reported by: alimuldal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org