You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by André Rocha Silva <a....@portaltelemedicina.com.br> on 2020/02/13 18:48:18 UTC

Problems with side inputs on dataflow

Hello everybody

I am facing a problem with a pipeline that runs perfectly on directrunner,
but when it comes to dataflow, it turns into a mess. It changes the element
and the side input (access).

The side input reads only a line with credentials.

Any thoughts on how its done are more than welcome. How do you manage
sensitive information in templated pipelines?

It is something like this:

class GetStuff(beam.DoFn):

def __init__(self, input1, input2):
self.input1 = input1
self.input2 = input2

def process(self, element, access):
user, token = access.split('\t')

thing1, thing2 = element.split('\t')


credentials_pipe = (
p
| 'Get credentials' >> beam.io.ReadFromText(user_options.credentials)
)

main_pipe = (
p
| 'Get information' >> beam.io.ReadFromText(user_options.input_file)
| 'Get prediction from severity' >> beam.ParDo(GetPrediction(
user_options.input1,
user_options.input2,
), beam.pvalue.AsSingleton(credentials_pipe))
)

p.run()


-- 

   *ANDRÉ ROCHA SILVA*
  * DATA ENGINEER*
  (48) 3181-0611

  <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
<http://portaltelemedicina.com.br/>
<https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
<https://pt-br.facebook.com/PortalTelemedicina/>
<https://www.linkedin.com/company/9426084/>

Re: Problems with side inputs on dataflow

Posted by Luke Cwik <lc...@google.com>.
That seems like a bug if Dataflow is reordering your parameter since you
are correct in thinking that the side input parameters always follow the
element parameters.

I would suggest asking Google Cloud customer support about this and provide
a job id/simple reproduction pipeline.

On Thu, Feb 13, 2020 at 11:24 AM André Rocha Silva <
a.silva@portaltelemedicina.com.br> wrote:

> It is getting access as an element and vice versa, it switches both. So I
> am never sure whether I am receiving (user   token) or (thing1   thing2).
>
> On Thu, Feb 13, 2020 at 4:17 PM Luke Cwik <lc...@google.com> wrote:
>
>> If Dataflow changes the access pattern, it should also apply a wrapper
>> function around how the side input is accessed and all of this should be
>> effectively opaque to your execution.
>>
>> Can you provide more details as to what the "mess" is?
>>
>> On Thu, Feb 13, 2020 at 10:48 AM André Rocha Silva <
>> a.silva@portaltelemedicina.com.br> wrote:
>>
>>> Hello everybody
>>>
>>> I am facing a problem with a pipeline that runs perfectly on
>>> directrunner, but when it comes to dataflow, it turns into a mess. It
>>> changes the element and the side input (access).
>>>
>>> The side input reads only a line with credentials.
>>>
>>> Any thoughts on how its done are more than welcome. How do you manage
>>> sensitive information in templated pipelines?
>>>
>>> It is something like this:
>>>
>>> class GetStuff(beam.DoFn):
>>>
>>> def __init__(self, input1, input2):
>>> self.input1 = input1
>>> self.input2 = input2
>>>
>>> def process(self, element, access):
>>> user, token = access.split('\t')
>>>
>>> thing1, thing2 = element.split('\t')
>>>
>>>
>>> credentials_pipe = (
>>> p
>>> | 'Get credentials' >> beam.io.ReadFromText(user_options.credentials)
>>> )
>>>
>>> main_pipe = (
>>> p
>>> | 'Get information' >> beam.io.ReadFromText(user_options.input_file)
>>> | 'Get prediction from severity' >> beam.ParDo(GetPrediction(
>>> user_options.input1,
>>> user_options.input2,
>>> ), beam.pvalue.AsSingleton(credentials_pipe))
>>> )
>>>
>>> p.run()
>>>
>>>
>>> --
>>>
>>>    *ANDRÉ ROCHA SILVA*
>>>   * DATA ENGINEER*
>>>   (48) 3181-0611
>>>
>>>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
>>> <http://portaltelemedicina.com.br/>
>>> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
>>> <https://pt-br.facebook.com/PortalTelemedicina/>
>>> <https://www.linkedin.com/company/9426084/>
>>>
>>>
>
> --
>
>    *ANDRÉ ROCHA SILVA*
>   * DATA ENGINEER*
>   (48) 3181-0611
>
>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
> <http://portaltelemedicina.com.br/>
> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
> <https://pt-br.facebook.com/PortalTelemedicina/>
> <https://www.linkedin.com/company/9426084/>
>
>

Re: Problems with side inputs on dataflow

Posted by André Rocha Silva <a....@portaltelemedicina.com.br>.
It is getting access as an element and vice versa, it switches both. So I
am never sure whether I am receiving (user   token) or (thing1   thing2).

On Thu, Feb 13, 2020 at 4:17 PM Luke Cwik <lc...@google.com> wrote:

> If Dataflow changes the access pattern, it should also apply a wrapper
> function around how the side input is accessed and all of this should be
> effectively opaque to your execution.
>
> Can you provide more details as to what the "mess" is?
>
> On Thu, Feb 13, 2020 at 10:48 AM André Rocha Silva <
> a.silva@portaltelemedicina.com.br> wrote:
>
>> Hello everybody
>>
>> I am facing a problem with a pipeline that runs perfectly on
>> directrunner, but when it comes to dataflow, it turns into a mess. It
>> changes the element and the side input (access).
>>
>> The side input reads only a line with credentials.
>>
>> Any thoughts on how its done are more than welcome. How do you manage
>> sensitive information in templated pipelines?
>>
>> It is something like this:
>>
>> class GetStuff(beam.DoFn):
>>
>> def __init__(self, input1, input2):
>> self.input1 = input1
>> self.input2 = input2
>>
>> def process(self, element, access):
>> user, token = access.split('\t')
>>
>> thing1, thing2 = element.split('\t')
>>
>>
>> credentials_pipe = (
>> p
>> | 'Get credentials' >> beam.io.ReadFromText(user_options.credentials)
>> )
>>
>> main_pipe = (
>> p
>> | 'Get information' >> beam.io.ReadFromText(user_options.input_file)
>> | 'Get prediction from severity' >> beam.ParDo(GetPrediction(
>> user_options.input1,
>> user_options.input2,
>> ), beam.pvalue.AsSingleton(credentials_pipe))
>> )
>>
>> p.run()
>>
>>
>> --
>>
>>    *ANDRÉ ROCHA SILVA*
>>   * DATA ENGINEER*
>>   (48) 3181-0611
>>
>>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
>> <http://portaltelemedicina.com.br/>
>> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
>> <https://pt-br.facebook.com/PortalTelemedicina/>
>> <https://www.linkedin.com/company/9426084/>
>>
>>

-- 

   *ANDRÉ ROCHA SILVA*
  * DATA ENGINEER*
  (48) 3181-0611

  <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
<http://portaltelemedicina.com.br/>
<https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
<https://pt-br.facebook.com/PortalTelemedicina/>
<https://www.linkedin.com/company/9426084/>

Re: Problems with side inputs on dataflow

Posted by Luke Cwik <lc...@google.com>.
If Dataflow changes the access pattern, it should also apply a wrapper
function around how the side input is accessed and all of this should be
effectively opaque to your execution.

Can you provide more details as to what the "mess" is?

On Thu, Feb 13, 2020 at 10:48 AM André Rocha Silva <
a.silva@portaltelemedicina.com.br> wrote:

> Hello everybody
>
> I am facing a problem with a pipeline that runs perfectly on directrunner,
> but when it comes to dataflow, it turns into a mess. It changes the element
> and the side input (access).
>
> The side input reads only a line with credentials.
>
> Any thoughts on how its done are more than welcome. How do you manage
> sensitive information in templated pipelines?
>
> It is something like this:
>
> class GetStuff(beam.DoFn):
>
> def __init__(self, input1, input2):
> self.input1 = input1
> self.input2 = input2
>
> def process(self, element, access):
> user, token = access.split('\t')
>
> thing1, thing2 = element.split('\t')
>
>
> credentials_pipe = (
> p
> | 'Get credentials' >> beam.io.ReadFromText(user_options.credentials)
> )
>
> main_pipe = (
> p
> | 'Get information' >> beam.io.ReadFromText(user_options.input_file)
> | 'Get prediction from severity' >> beam.ParDo(GetPrediction(
> user_options.input1,
> user_options.input2,
> ), beam.pvalue.AsSingleton(credentials_pipe))
> )
>
> p.run()
>
>
> --
>
>    *ANDRÉ ROCHA SILVA*
>   * DATA ENGINEER*
>   (48) 3181-0611
>
>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
> <http://portaltelemedicina.com.br/>
> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
> <https://pt-br.facebook.com/PortalTelemedicina/>
> <https://www.linkedin.com/company/9426084/>
>
>