You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Alistair Muldal (Jira)" <ji...@apache.org> on 2021/12/03 18:49:00 UTC

[jira] [Created] (BEAM-13379) Incrementing a counter from a Python subthread doesn't seem to do anything

Alistair Muldal created BEAM-13379:
--------------------------------------

             Summary: Incrementing a counter from a Python subthread doesn't seem to do anything
                 Key: BEAM-13379
                 URL: https://issues.apache.org/jira/browse/BEAM-13379
             Project: Beam
          Issue Type: Bug
          Components: beam-model
    Affects Versions: 2.34.0
         Environment: Debian 5.10.46-5rodete1 (2021-09-28) x86_64 GNU/Linux
            Reporter: Alistair Muldal


For example:

{code:python}
from concurrent import futures
import threading

from absl import app
from absl import logging
import apache_beam as beam

NAMESPACE = 'METRICS_THREADS_REPRO'

main_thread_counter = beam.metrics.Metrics.counter(
    NAMESPACE, 'main_thread_counter')
sub_thread_counter = beam.metrics.Metrics.counter(
    NAMESPACE, 'sub_thread_counter')


def increment_counter(counter):
  counter.inc()
  logging.info('Incremented counter %s from thread %s',
               counter.metric_name, threading.current_thread().name)


class IncrementCountersFn(beam.DoFn):

  def setup(self):
    self.executor = futures.ThreadPoolExecutor()

  def process(self, idx):
    increment_counter(main_thread_counter)
    self.executor.submit(increment_counter, sub_thread_counter).result()
    logging.info('Processed %i', idx)


def main(argv):
  if len(argv) > 1:
    raise app.UsageError('Too many command-line arguments.')
  p = beam.Pipeline()
  _ = (
      p
      | 'Create' >> beam.Create(range(100))
      | 'Process' >> beam.ParDo(IncrementCountersFn()))

  result = p.run()
  result.wait_until_finish()

  filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
  filtered_metrics = result.metrics().query(filter_by_namespace)
  logging.info('Pipeline finished, metrics logged: %s', filtered_metrics)

if __name__ == '__main__':
  app.run(main)
{code}

Only {{main_thread_counter}} is incremented, not {{sub_thread_counter}}:
{noformat}
I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished, metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process, metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
counter), labels={}), committed=100, attempted=100)], 'distributions': [], 'gauges': []}
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)