You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 21:53:50 UTC

[GitHub] [beam] damccorm opened a new issue, #21217: Incrementing a counter from a Python subthread doesn't seem to do anything

damccorm opened a new issue, #21217:
URL: https://github.com/apache/beam/issues/21217

   For example:
   
   ```
   
   from concurrent import futures
   import threading
   
   from absl import app
   from absl import logging
   import
   apache_beam as beam
   
   NAMESPACE = 'METRICS_THREADS_REPRO'
   
   main_thread_counter = beam.metrics.Metrics.counter(
    
     NAMESPACE, 'main_thread_counter')
   sub_thread_counter = beam.metrics.Metrics.counter(
       NAMESPACE,
   'sub_thread_counter')
   
   
   def increment_counter(counter):
     counter.inc()
     logging.info('Incremented
   counter %s from thread %s',
                  counter.metric_name, threading.current_thread().name)
   
   
   class
   IncrementCountersFn(beam.DoFn):
   
     def setup(self):
       self.executor = futures.ThreadPoolExecutor()
   
    
   def process(self, idx):
       increment_counter(main_thread_counter)
       self.executor.submit(increment_counter,
   sub_thread_counter).result()
       logging.info('Processed %i', idx)
   
   
   def main(argv):
     if len(argv)
   > 1:
       raise app.UsageError('Too many command-line arguments.')
   
     p = beam.Pipeline()
     _ = (
    
       p
         | 'Create' >> beam.Create(range(100))
         | 'Process' >> beam.ParDo(IncrementCountersFn()))
   
    
   result = p.run()
     result.wait_until_finish()
   
     filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
    
   filtered_metrics = result.metrics().query(filter_by_namespace)
     logging.info('Pipeline finished, metrics
   logged: %s', filtered_metrics)
   
   if __name__ == '__main__':
     app.run(main)
   
   ```
   
   
   Only `main_thread_counter` is incremented, not `sub_thread_counter`:
   ```
   
   I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished, metrics logged: {'counters':
   [MetricResult(key=MetricKey(step=Process, metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
   counter),
   labels={}), committed=100, attempted=100)], 'distributions': [], 'gauges': []}
   
   ```
   
   
   Imported from Jira [BEAM-13379](https://issues.apache.org/jira/browse/BEAM-13379). Original Jira may contain additional context.
   Reported by: alimuldal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org