You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Manninger, Matyas" <ma...@veolia.com> on 2020/12/17 15:29:20 UTC
error with periodic impulse
Dear Beam users,
I am trying to write a pipeline that would use a BigQuery table as
sideinput, but I want beam to refresh the content in memory periodically.
For this, I am using the recommended periodic impulse and then a read from
bigquery.
I have defined the following class for this:
class BigQuerySideinput(beam.PTransform):
> def __init__(self, query, refresh_interval):
> super(BigQuerySideinput, self).__init__()
> self.query = query
> self.refresh_interval = refresh_interval
> def expand(self, pcoll):
> return ( pcoll
> | 'periodic_impulse' >>
> beam.transforms.periodicsequence.PeriodicImpulse(fire_interval=self.refresh_interval,
> apply_windowing=True)
> | 'inject_query' >> beam.Map(lambda x: self.query)
> | 'run_query' >> beam.ParDo(BigQueryRead()))
Here BigQueryRead() is a DoFn that takes the query and reads the data. When
I try to run a pipeline with this step in it I get the following error:
> RuntimeError: Transform node
> AppliedPTransform(EmployeeLookup/Employee:Collect/periodic_impulse/GenSequence/Proc
> essKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced
> as expected.
Could anyone help with any idea what might cause the issue and how to fix
it? Also if there is a better way to get the same behavior I am also up for
those suggestions. Thanks in advance. (If more details are needed I would
be happy to provide them.)
Best regards,
Matyas Manninger