You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Evgeny (JIRA)" <ji...@apache.org> on 2018/11/12 10:32:00 UTC
[jira] [Updated] (BEAM-6036) How to periodically refresh side
inputs.
[ https://issues.apache.org/jira/browse/BEAM-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Evgeny updated BEAM-6036:
-------------------------
Description:
I have followed the example provided here [https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1] in the "Pattern: Slowly-changing lookup cache" section. I've converted the pseudo-code from the article into this Java code:
return p
.apply("GenerateSequence", GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
.apply("GenerateSequenceWindow",
Window.<Long>into(new GlobalWindows()).triggering(
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply("RetrieveKVs",
ParDo.of(new RetrieveKVs()))
.apply("ToMap", View.asMap());
RetrieveKVs() queries BigQuery table and outputs KVs.
The issue here is that the resulting map mixes up KVs from different periods (i.e. the sequence is generated every 1 hour, the resulting map includes KVs from 2 adjacent hours).
In an attempt to solve it I tried using View.asSingleton() instead.
return p
.apply("GenerateSequence", GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
.apply("GenerateSequenceWindow",
Window.<Long>into(new GlobalWindows()).triggering(
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply("RetrieveMap",
ParDo.of(new RetrieveMap()))
.apply("ToMap", View.asSingleton());
RetrieveMap queries data from BigQuery and outputs the complete map. The issue with this is it not only results in flaky tests with the exception 1 times out of 10:
Caused by: java.lang.IllegalArgumentException: Empty PCollection accessed as a singleton view. Consider setting withDefault to provide a def
ault value
but also it doesn't seem to work. In the logs I see the RetrieveMap is called every hour, but the pipeline using the side input get stale data.
Is there a real working example for how to make a side input refresh periodically?
was:
I have followed the example provided here [https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1] in the "Pattern: Slowly-changing lookup cache" section. I've converted the pseudo-code from the article into this Java code:
return p
.apply("GenerateSequence", GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
.apply("GenerateSequenceWindow",
Window.<Long>into(new GlobalWindows()).triggering(
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply("ConvertToKVs",
ParDo.of(new RetrieveToKVs()))
.apply("ToMap", View.asMap());
RetrieveToKVs() queries BigQuery table and outputs KVs.
The issue here is that the resulting map mixes up KVs from different periods (i.e. the sequence is generated every 1 hour, the resulting map includes KVs from 2 adjacent hours).
In an attempt to solve it I tried using View.asSingleton() instead.
return p
.apply("GenerateSequence", GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
.apply("GenerateSequenceWindow",
Window.<Long>into(new GlobalWindows()).triggering(
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply("ConvertToKVs",
ParDo.of(new RetrieveMap()))
.apply("ToMap", View.asSingleton());
RetrieveMap queries data from BigQuery and outputs the complete map. The issue with this is it not only results in flaky tests with the exception 1 times out of 10:
Caused by: java.lang.IllegalArgumentException: Empty PCollection accessed as a singleton view. Consider setting withDefault to provide a def
ault value
but also it doesn't seem to work. In the logs I see the RetrieveMap is called every hour, but the pipeline using the side input get stale data.
Is there a real working example for how to make a side input refresh periodically?
> How to periodically refresh side inputs.
> ----------------------------------------
>
> Key: BEAM-6036
> URL: https://issues.apache.org/jira/browse/BEAM-6036
> Project: Beam
> Issue Type: Bug
> Components: beam-model
> Reporter: Evgeny
> Assignee: Kenneth Knowles
> Priority: Blocker
>
> I have followed the example provided here [https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1] in the "Pattern: Slowly-changing lookup cache" section. I've converted the pseudo-code from the article into this Java code:
> return p
> .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
> .apply("GenerateSequenceWindow",
> Window.<Long>into(new GlobalWindows()).triggering(
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
> .discardingFiredPanes())
> .apply("RetrieveKVs",
> ParDo.of(new RetrieveKVs()))
> .apply("ToMap", View.asMap());
> RetrieveKVs() queries BigQuery table and outputs KVs.
> The issue here is that the resulting map mixes up KVs from different periods (i.e. the sequence is generated every 1 hour, the resulting map includes KVs from 2 adjacent hours).
> In an attempt to solve it I tried using View.asSingleton() instead.
> return p
> .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
> .apply("GenerateSequenceWindow",
> Window.<Long>into(new GlobalWindows()).triggering(
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
> .discardingFiredPanes())
> .apply("RetrieveMap",
> ParDo.of(new RetrieveMap()))
> .apply("ToMap", View.asSingleton());
> RetrieveMap queries data from BigQuery and outputs the complete map. The issue with this is it not only results in flaky tests with the exception 1 times out of 10:
> Caused by: java.lang.IllegalArgumentException: Empty PCollection accessed as a singleton view. Consider setting withDefault to provide a def
> ault value
> but also it doesn't seem to work. In the logs I see the RetrieveMap is called every hour, but the pipeline using the side input get stale data.
> Is there a real working example for how to make a side input refresh periodically?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)