You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Manninger, Matyas" <ma...@veolia.com> on 2020/12/17 15:29:20 UTC

error with periodic impulse

Dear Beam users,

I am trying to write a pipeline that would use a BigQuery table as
sideinput, but I want beam to refresh the content in memory periodically.
For this, I am using the recommended periodic impulse and then a read from
bigquery.
I have defined the following class for this:

class BigQuerySideinput(beam.PTransform):
>   def __init__(self, query, refresh_interval):
>     super(BigQuerySideinput, self).__init__()
>     self.query = query
>     self.refresh_interval = refresh_interval
>   def expand(self, pcoll):
>     return ( pcoll
>            | 'periodic_impulse' >>
> beam.transforms.periodicsequence.PeriodicImpulse(fire_interval=self.refresh_interval,
> apply_windowing=True)
>            | 'inject_query' >> beam.Map(lambda x: self.query)
>            | 'run_query' >> beam.ParDo(BigQueryRead()))


Here BigQueryRead() is a DoFn that takes the query and reads the data. When
I try to run a pipeline with this step in it I get the following error:

> RuntimeError: Transform node
> AppliedPTransform(EmployeeLookup/Employee:Collect/periodic_impulse/GenSequence/Proc
> essKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced
> as expected.

Could anyone help with any idea what might cause the issue and how to fix
it? Also if there is a better way to get the same behavior I am also up for
those suggestions. Thanks in advance. (If more details are needed I would
be happy to provide them.)

Best regards,
Matyas Manninger