You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Kevin Peterson <ke...@google.com> on 2017/06/14 18:39:12 UTC

Fwd: Creating side input map with global window

Hi all,

I am working on a (streaming) pipeline which reads elements from Pubsub,
and schemas for those elements from a separate pubsub topic. I'd like to be
able to create a side input map from the schema topic, and have that
available to the main pipeline for parsing. Each message on the schema
pubsub topic contains all schemas I care about, so for every new message, I
want to generate a new map that will be available to the main pipeline
(eventual consistency is fine). I don't have any windows or triggers on the
main flow, since I really just want each element to be processed as it
arrives, using whatever the latest schema available is.

I am currently trying this with:

PCollection<KV<String, String>> schema = pipeline
        .apply("Read Schema",
                PubsubIO.readStrings().fromTopic("topic_for_schema"))
        .apply(Window.<String>into(new GlobalWindows()).triggering(

Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
        .apply("Create Schema", ParDo.of(new
SchemaDirectory.GenerateSchema()));  // outputs around 100 elements
for each input


PCollectionView<Map<String, String>> schemaView =
schema.apply(View.<String, String>asMap());

pipeline
        .apply("Read Elements",
PubsubIO.readStrings().fromTopic("topic_for_elements")).apply("Parse
Elements",

ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {

String name = getNameFromElement(c.element());


                String schema = c.sideInput(schemaView).get(name);


                c.output(parse(c, schema));

}
}).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.
writeTableRows()) // Other BQ options not copied.

When running this pipeline, the View.AsMap/View.CreatePCol
lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
stage never emits any elements, and so the pipeline never progresses. I can
see the messages at the input stage, but nothing appears on the output.

Any advice?

Thanks,
-Kevin

Re: Creating side input map with global window

Posted by Kevin Peterson <ke...@google.com>.
Yep, that is the basic pattern I am looking for. A couple of comments:

1. I can "poke" the side input via pubsub when I want to update the input,
so I don't need any other mechanism to force reload.
2. I don't need to reprocess elements when I get a new side input. As long
as the side input is updated eventually (within reason!), that is fine.

Any suggestions on using existing mechanisms to make this work?

On a side note, the reason I'm looking into side inputs:
I tried just having the DoFn's initialize the schema map on the first
element, but that leads to the problem of element 1 taking a while ->
Dataflow starts more threads -> those also take a while since they also
need a schema map -> more threads -> more maps created -> OOM!

On Wed, Jun 14, 2017 at 12:57 PM, Eugene Kirpichov <ki...@google.com>
wrote:

> Seems related to https://issues.apache.org/jira/browse/BEAM-1197?
>
> On Wed, Jun 14, 2017 at 11:39 AM Kevin Peterson <ke...@google.com>
> wrote:
>
>> Hi all,
>>
>> I am working on a (streaming) pipeline which reads elements from Pubsub,
>> and schemas for those elements from a separate pubsub topic. I'd like to be
>> able to create a side input map from the schema topic, and have that
>> available to the main pipeline for parsing. Each message on the schema
>> pubsub topic contains all schemas I care about, so for every new message, I
>> want to generate a new map that will be available to the main pipeline
>> (eventual consistency is fine). I don't have any windows or triggers on the
>> main flow, since I really just want each element to be processed as it
>> arrives, using whatever the latest schema available is.
>>
>> I am currently trying this with:
>>
>> PCollection<KV<String, String>> schema = pipeline
>>         .apply("Read Schema",
>>                 PubsubIO.readStrings().fromTopic("topic_for_schema"))
>>         .apply(Window.<String>into(new GlobalWindows()).triggering(
>>                 Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
>>         .apply("Create Schema", ParDo.of(new SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each input
>>
>>
>> PCollectionView<Map<String, String>> schemaView =
>> schema.apply(View.<String, String>asMap());
>>
>> pipeline
>>         .apply("Read Elements", PubsubIO.readStrings().fromTopic("topic_for_elements")).apply("Parse Elements",
>>
>> ParDo.of(new DoFn<String, TableRow>() {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>>
>> String name = getNameFromElement(c.element());
>>
>>
>>                 String schema = c.sideInput(schemaView).get(name);
>>
>>
>>                 c.output(parse(c, schema));
>>
>> }
>> }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.
>> writeTableRows()) // Other BQ options not copied.
>>
>> When running this pipeline, the View.AsMap/View.
>> CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>> stage never emits any elements, and so the pipeline never progresses. I
>> can see the messages at the input stage, but nothing appears on the output.
>>
>> Any advice?
>>
>> Thanks,
>> -Kevin
>>
>>

Re: Creating side input map with global window

Posted by Eugene Kirpichov <ki...@google.com>.
Seems related to https://issues.apache.org/jira/browse/BEAM-1197?

On Wed, Jun 14, 2017 at 11:39 AM Kevin Peterson <ke...@google.com> wrote:

> Hi all,
>
> I am working on a (streaming) pipeline which reads elements from Pubsub,
> and schemas for those elements from a separate pubsub topic. I'd like to be
> able to create a side input map from the schema topic, and have that
> available to the main pipeline for parsing. Each message on the schema
> pubsub topic contains all schemas I care about, so for every new message, I
> want to generate a new map that will be available to the main pipeline
> (eventual consistency is fine). I don't have any windows or triggers on the
> main flow, since I really just want each element to be processed as it
> arrives, using whatever the latest schema available is.
>
> I am currently trying this with:
>
> PCollection<KV<String, String>> schema = pipeline
>         .apply("Read Schema",
>                 PubsubIO.readStrings().fromTopic("topic_for_schema"))
>         .apply(Window.<String>into(new GlobalWindows()).triggering(
>                 Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
>         .apply("Create Schema", ParDo.of(new SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each input
>
>
> PCollectionView<Map<String, String>> schemaView =
> schema.apply(View.<String, String>asMap());
>
> pipeline
>         .apply("Read Elements", PubsubIO.readStrings().fromTopic("topic_for_elements")).apply("Parse Elements",
>
> ParDo.of(new DoFn<String, TableRow>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
>
> String name = getNameFromElement(c.element());
>
>
>                 String schema = c.sideInput(schemaView).get(name);
>
>
>                 c.output(parse(c, schema));
>
> }
> }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.
> writeTableRows()) // Other BQ options not copied.
>
> When running this pipeline, the View.AsMap/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
> stage never emits any elements, and so the pipeline never progresses. I
> can see the messages at the input stage, but nothing appears on the output.
>
> Any advice?
>
> Thanks,
> -Kevin
>
>

Re: 答复: Creating side input map with global window

Posted by Lukasz Cwik <lc...@google.com>.
To unit test your function, have it accept a supplier with the default
supplier being the one that gives you a reference to the static instance
and another supplier for testing purposes.

On Fri, Jun 23, 2017 at 8:23 AM, Kevin Peterson <ke...@google.com> wrote:

> Hey Lukasz,
>
> I tried using the setup function, but since this a streaming pipeline, the
> batches tend to be pretty small. I could force the pipeline to batch things
> up, but that feels like something that shouldn't be needed. I was already
> caching between elements within a thread, the problem was at pipeline
> start, or when a new instance was started, since each thread has its own
> cache.
>
> Using a static cache worked!
>
> private static final LoadingCache<BlobId, TypesCache> CACHE =
>         CacheBuilder.newBuilder()
>                     .refreshAfterWrite(30, TimeUnit.MINUTES)
>                     .build(new CacheLoader());
>
> This has gotten me unblocked, but isn't a perfect solution. Because the
> cache is static, I can't set any parameters of it, meaning that it is very
> hard to unit test because it is hard coded to access cloud storage instead
> of a local file.
>
> I tried using a Singleton to hold the cache and fetch it from the DoFn,
> but it seems like the Singleton isn't shared amongst all of the threads. I
> can see from the logs that all of the DoFn calls are on the same worker
> instance and different threads, I see a log statement from inside my
> synchronized block for each thread, which shouldn't be possible.
>
> Thoughts?
>
>
> On Thu, Jun 15, 2017 at 6:26 AM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Take a look at DoFn setup/teardown, called only once per DoFn instance
>> and not per element so it makes easier to write initialization code.
>>
>> Also if the schema map is shared, have you thought of using a single
>> static instance of Guava's LoadingCache shared amongst all the DoFn
>> instances?
>>
>> You can also refresh the data stored within the cache periodically.
>>
>> On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson <ke...@google.com>
>> wrote:
>>
>>> Still gets stuck at the same place :/
>>>
>>> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) <
>>> tangjijun@yhd.com> wrote:
>>>
>>>>
>>>>
>>>> .triggering(
>>>>         AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>>>>         .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>>>>
>>>>
>>>>
>>>> Try the trigger above
>>>>
>>>>
>>>>
>>>> *发件人:* Kevin Peterson [mailto:kevincp@google.com]
>>>> *发送时间:* 2017年6月15日 2:39
>>>> *收件人:* user@beam.apache.org
>>>> *主题:* Fwd: Creating side input map with global window
>>>>
>>>>
>>>>
>>>> Hi all,
>>>>
>>>>
>>>>
>>>> I am working on a (streaming) pipeline which reads elements from
>>>> Pubsub, and schemas for those elements from a separate pubsub topic. I'd
>>>> like to be able to create a side input map from the schema topic, and have
>>>> that available to the main pipeline for parsing. Each message on the schema
>>>> pubsub topic contains all schemas I care about, so for every new message, I
>>>> want to generate a new map that will be available to the main pipeline
>>>> (eventual consistency is fine). I don't have any windows or triggers on the
>>>> main flow, since I really just want each element to be processed as it
>>>> arrives, using whatever the latest schema available is.
>>>>
>>>>
>>>>
>>>> I am currently trying this with:
>>>>
>>>>
>>>>
>>>> PCollection<KV<String, String>> schema = pipeline
>>>>         .apply("Read Schema",
>>>>                 PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>>>>         .apply(Window.<String>*into*(new GlobalWindows()).triggering(
>>>>                 Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>>>>         .apply("Create Schema", ParDo.*of*(new SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each input
>>>>
>>>>
>>>>
>>>> PCollectionView<Map<String, String>> schemaView =
>>>>         schema.apply(View.<String, String>*asMap*());
>>>>
>>>> pipeline
>>>>         .apply("Read Elements", PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse Elements",
>>>>
>>>>         ParDo.*of*(new DoFn<String, TableRow>() {
>>>>             @ProcessElement
>>>>             public void processElement(ProcessContext c) {
>>>>
>>>>                 String name = getNameFromElement(c.element());
>>>>
>>>>
>>>>                 String schema = c.sideInput(schemaView).get(name);
>>>>
>>>>
>>>>                 c.output(parse(c, schema));
>>>>
>>>>             }
>>>>         }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.*writeTableRows*()) // Other BQ options not copied.
>>>>
>>>> When running this pipeline, the View.AsMap/View.CreatePCol
>>>> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>>>> stage never emits any elements, and so the pipeline never progresses.
>>>> I can see the messages at the input stage, but nothing appears on the
>>>> output.
>>>>
>>>>
>>>>
>>>> Any advice?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> -Kevin
>>>>
>>>>
>>>>
>>>
>>>
>>
>

Re: 答复: Creating side input map with global window

Posted by Kevin Peterson <ke...@google.com>.
Hey Lukasz,

I tried using the setup function, but since this a streaming pipeline, the
batches tend to be pretty small. I could force the pipeline to batch things
up, but that feels like something that shouldn't be needed. I was already
caching between elements within a thread, the problem was at pipeline
start, or when a new instance was started, since each thread has its own
cache.

Using a static cache worked!

private static final LoadingCache<BlobId, TypesCache> CACHE =
        CacheBuilder.newBuilder()
                    .refreshAfterWrite(30, TimeUnit.MINUTES)
                    .build(new CacheLoader());

This has gotten me unblocked, but isn't a perfect solution. Because the
cache is static, I can't set any parameters of it, meaning that it is very
hard to unit test because it is hard coded to access cloud storage instead
of a local file.

I tried using a Singleton to hold the cache and fetch it from the DoFn, but
it seems like the Singleton isn't shared amongst all of the threads. I can
see from the logs that all of the DoFn calls are on the same worker
instance and different threads, I see a log statement from inside my
synchronized block for each thread, which shouldn't be possible.

Thoughts?


On Thu, Jun 15, 2017 at 6:26 AM, Lukasz Cwik <lc...@google.com> wrote:

> Take a look at DoFn setup/teardown, called only once per DoFn instance and
> not per element so it makes easier to write initialization code.
>
> Also if the schema map is shared, have you thought of using a single
> static instance of Guava's LoadingCache shared amongst all the DoFn
> instances?
>
> You can also refresh the data stored within the cache periodically.
>
> On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson <ke...@google.com>
> wrote:
>
>> Still gets stuck at the same place :/
>>
>> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) <
>> tangjijun@yhd.com> wrote:
>>
>>>
>>>
>>> .triggering(
>>>         AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>>>         .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>>>
>>>
>>>
>>> Try the trigger above
>>>
>>>
>>>
>>> *发件人:* Kevin Peterson [mailto:kevincp@google.com]
>>> *发送时间:* 2017年6月15日 2:39
>>> *收件人:* user@beam.apache.org
>>> *主题:* Fwd: Creating side input map with global window
>>>
>>>
>>>
>>> Hi all,
>>>
>>>
>>>
>>> I am working on a (streaming) pipeline which reads elements from Pubsub,
>>> and schemas for those elements from a separate pubsub topic. I'd like to be
>>> able to create a side input map from the schema topic, and have that
>>> available to the main pipeline for parsing. Each message on the schema
>>> pubsub topic contains all schemas I care about, so for every new message, I
>>> want to generate a new map that will be available to the main pipeline
>>> (eventual consistency is fine). I don't have any windows or triggers on the
>>> main flow, since I really just want each element to be processed as it
>>> arrives, using whatever the latest schema available is.
>>>
>>>
>>>
>>> I am currently trying this with:
>>>
>>>
>>>
>>> PCollection<KV<String, String>> schema = pipeline
>>>         .apply("Read Schema",
>>>                 PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>>>         .apply(Window.<String>*into*(new GlobalWindows()).triggering(
>>>                 Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>>>         .apply("Create Schema", ParDo.*of*(new SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each input
>>>
>>>
>>>
>>> PCollectionView<Map<String, String>> schemaView =
>>>         schema.apply(View.<String, String>*asMap*());
>>>
>>> pipeline
>>>         .apply("Read Elements", PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse Elements",
>>>
>>>         ParDo.*of*(new DoFn<String, TableRow>() {
>>>             @ProcessElement
>>>             public void processElement(ProcessContext c) {
>>>
>>>                 String name = getNameFromElement(c.element());
>>>
>>>
>>>                 String schema = c.sideInput(schemaView).get(name);
>>>
>>>
>>>                 c.output(parse(c, schema));
>>>
>>>             }
>>>         }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.*writeTableRows*()) // Other BQ options not copied.
>>>
>>> When running this pipeline, the View.AsMap/View.CreatePCol
>>> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>>> stage never emits any elements, and so the pipeline never progresses. I
>>> can see the messages at the input stage, but nothing appears on the output.
>>>
>>>
>>>
>>> Any advice?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> -Kevin
>>>
>>>
>>>
>>
>>
>

Re: 答复: Creating side input map with global window

Posted by Lukasz Cwik <lc...@google.com>.
Take a look at DoFn setup/teardown, called only once per DoFn instance and
not per element so it makes easier to write initialization code.

Also if the schema map is shared, have you thought of using a single static
instance of Guava's LoadingCache shared amongst all the DoFn instances?

You can also refresh the data stored within the cache periodically.

On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson <ke...@google.com> wrote:

> Still gets stuck at the same place :/
>
> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) <
> tangjijun@yhd.com> wrote:
>
>>
>>
>> .triggering(
>>         AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>>         .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>>
>>
>>
>> Try the trigger above
>>
>>
>>
>> *发件人:* Kevin Peterson [mailto:kevincp@google.com]
>> *发送时间:* 2017年6月15日 2:39
>> *收件人:* user@beam.apache.org
>> *主题:* Fwd: Creating side input map with global window
>>
>>
>>
>> Hi all,
>>
>>
>>
>> I am working on a (streaming) pipeline which reads elements from Pubsub,
>> and schemas for those elements from a separate pubsub topic. I'd like to be
>> able to create a side input map from the schema topic, and have that
>> available to the main pipeline for parsing. Each message on the schema
>> pubsub topic contains all schemas I care about, so for every new message, I
>> want to generate a new map that will be available to the main pipeline
>> (eventual consistency is fine). I don't have any windows or triggers on the
>> main flow, since I really just want each element to be processed as it
>> arrives, using whatever the latest schema available is.
>>
>>
>>
>> I am currently trying this with:
>>
>>
>>
>> PCollection<KV<String, String>> schema = pipeline
>>         .apply("Read Schema",
>>                 PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>>         .apply(Window.<String>*into*(new GlobalWindows()).triggering(
>>                 Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>>         .apply("Create Schema", ParDo.*of*(new SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each input
>>
>>
>>
>> PCollectionView<Map<String, String>> schemaView =
>>         schema.apply(View.<String, String>*asMap*());
>>
>> pipeline
>>         .apply("Read Elements", PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse Elements",
>>
>>         ParDo.*of*(new DoFn<String, TableRow>() {
>>             @ProcessElement
>>             public void processElement(ProcessContext c) {
>>
>>                 String name = getNameFromElement(c.element());
>>
>>
>>                 String schema = c.sideInput(schemaView).get(name);
>>
>>
>>                 c.output(parse(c, schema));
>>
>>             }
>>         }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.*writeTableRows*()) // Other BQ options not copied.
>>
>> When running this pipeline, the View.AsMap/View.CreatePCol
>> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>> stage never emits any elements, and so the pipeline never progresses. I
>> can see the messages at the input stage, but nothing appears on the output.
>>
>>
>>
>> Any advice?
>>
>>
>>
>> Thanks,
>>
>> -Kevin
>>
>>
>>
>
>

Re: 答复: Creating side input map with global window

Posted by Kevin Peterson <ke...@google.com>.
Still gets stuck at the same place :/

On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) <
tangjijun@yhd.com> wrote:

>
>
> .triggering(
>         AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>         .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>
>
>
> Try the trigger above
>
>
>
> *发件人:* Kevin Peterson [mailto:kevincp@google.com]
> *发送时间:* 2017年6月15日 2:39
> *收件人:* user@beam.apache.org
> *主题:* Fwd: Creating side input map with global window
>
>
>
> Hi all,
>
>
>
> I am working on a (streaming) pipeline which reads elements from Pubsub,
> and schemas for those elements from a separate pubsub topic. I'd like to be
> able to create a side input map from the schema topic, and have that
> available to the main pipeline for parsing. Each message on the schema
> pubsub topic contains all schemas I care about, so for every new message, I
> want to generate a new map that will be available to the main pipeline
> (eventual consistency is fine). I don't have any windows or triggers on the
> main flow, since I really just want each element to be processed as it
> arrives, using whatever the latest schema available is.
>
>
>
> I am currently trying this with:
>
>
>
> PCollection<KV<String, String>> schema = pipeline
>         .apply("Read Schema",
>                 PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>         .apply(Window.<String>*into*(new GlobalWindows()).triggering(
>                 Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>         .apply("Create Schema", ParDo.*of*(new SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each input
>
>
>
> PCollectionView<Map<String, String>> schemaView =
>         schema.apply(View.<String, String>*asMap*());
>
> pipeline
>         .apply("Read Elements", PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse Elements",
>
>         ParDo.*of*(new DoFn<String, TableRow>() {
>             @ProcessElement
>             public void processElement(ProcessContext c) {
>
>                 String name = getNameFromElement(c.element());
>
>
>                 String schema = c.sideInput(schemaView).get(name);
>
>
>                 c.output(parse(c, schema));
>
>             }
>         }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.*writeTableRows*()) // Other BQ options not copied.
>
> When running this pipeline, the View.AsMap/View.
> CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
> stage never emits any elements, and so the pipeline never progresses. I
> can see the messages at the input stage, but nothing appears on the output.
>
>
>
> Any advice?
>
>
>
> Thanks,
>
> -Kevin
>
>
>

答复: Creating side input map with global window

Posted by "Tang Jijun (上海_中台研发部_数据平台部_基础数据部_唐觊隽)" <ta...@yhd.com>.

.triggering(
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1)))
        .discardingFiredPanes().withAllowedLateness(Duration.ZERO));

Try the trigger above

发件人: Kevin Peterson [mailto:kevincp@google.com]
发送时间: 2017年6月15日 2:39
收件人: user@beam.apache.org
主题: Fwd: Creating side input map with global window

Hi all,

I am working on a (streaming) pipeline which reads elements from Pubsub, and schemas for those elements from a separate pubsub topic. I'd like to be able to create a side input map from the schema topic, and have that available to the main pipeline for parsing. Each message on the schema pubsub topic contains all schemas I care about, so for every new message, I want to generate a new map that will be available to the main pipeline (eventual consistency is fine). I don't have any windows or triggers on the main flow, since I really just want each element to be processed as it arrives, using whatever the latest schema available is.

I am currently trying this with:


PCollection<KV<String, String>> schema = pipeline
        .apply("Read Schema",
                PubsubIO.readStrings().fromTopic("topic_for_schema"))
        .apply(Window.<String>into(new GlobalWindows()).triggering(
                Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
        .apply("Create Schema", ParDo.of(new SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each input



PCollectionView<Map<String, String>> schemaView =
        schema.apply(View.<String, String>asMap());

pipeline
        .apply("Read Elements", PubsubIO.readStrings().fromTopic("topic_for_elements")).apply("Parse Elements",

        ParDo.of(new DoFn<String, TableRow>() {
            @ProcessElement
            public void processElement(ProcessContext c) {

                String name = getNameFromElement(c.element());

                String schema = c.sideInput(schemaView).get(name);

                c.output(parse(c, schema));

            }
        }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.writeTableRows()) // Other BQ options not copied.
When running this pipeline, the View.AsMap/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey stage never emits any elements, and so the pipeline never progresses. I can see the messages at the input stage, but nothing appears on the output.

Any advice?

Thanks,
-Kevin