You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Jay K (Jira)" <ji...@apache.org> on 2020/09/10 17:13:00 UTC

[jira] [Created] (BEAM-10872) Value Provider functionality broken in python sdk

Jay K created BEAM-10872:
----------------------------

             Summary: Value Provider functionality broken in python sdk
                 Key: BEAM-10872
                 URL: https://issues.apache.org/jira/browse/BEAM-10872
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow
    Affects Versions: 2.22.0
         Environment: requirements.txt can be found here: https://gist.github.com/jay-karimi/20524163266e7c90b2fc9b7a6b401e43
            Reporter: Jay K


Whether using a custom function or an IO connector (test with `WriteToBigQuery`), the dataflow job complains that the runtime value provider `get()` is being called from a non-runtime context:


{code:java}
import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class ExampleDoF(beam.DoFn):
    def __init__(self, value_provider):
        self.value_provider = value_provider

    def process(self, el):
        logging.info(f'el: {el}')
        logging.info(f'value provider: {self.value_provider.get()}')
        yield el


class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--value_provider', type=int)

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args()
pipeline_options = PipelineOptions(pipeline_args)
user_options = pipeline_options.view_as(UserOptions)
with beam.Pipeline(options=pipeline_options) as pipeline:
    results = (
            pipeline
            | beam.Create(['element'])
            | beam.ParDo(ExampleDoF(user_options.value_provider))
    )


{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)