You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Almeida, Julius" <Ju...@intuit.com> on 2020/09/23 19:16:18 UTC

Extract PCollection from PCollectionView

Hi Team,



I am running my beam pipeline using Flink runner and trying to consumer side input which are records from RDS.



1. Side Input Generation :



PCollectionView<JdbcIO.Read<EventSet>> eventSet = pipeline

       .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5L)))

       .apply(

           Window.<Long>into(new GlobalWindows())

               .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))

               .discardingFiredPanes())

       .apply("Read from RDS", ParDo.of(new DataSource(runtimeConfig.getConfig("db"))))
       .apply(View.asSingleton());


2 . Side Input Consumption :



        pipeline.apply("Display Event", ParDo.of(new PrintRecord(eventSet)).withSideInputs(eventSet))



3. Accessing records :



         class PrintRecord extends DoFn<String, String>{



  @ProcessElement
  public void processElement(ProcessContext context) {
              JdbcIO.Read<EventSet> event = context.sideInput(sideInput);

            event. ?  // can’t read event object here, it’s a POJO

  }

}





Thanks,

Julius






Re: Extract PCollection from PCollectionView

Posted by Luke Cwik <lc...@google.com>.
You're using a singleton side input for one that is being triggered
multiple times. This will start throwing errors on you once the side input
contains more then one record.

Try using a list and getting the last element.

There is currently no support for having a combiner inside a
PCollectionViews. You can use a combine transform earlier but you will
still run into the same problem in how the PCollectionView contains
multiple entries or you'll never produce output and the side input will
never be ready.

On Wed, Sep 23, 2020 at 12:16 PM Almeida, Julius <Ju...@intuit.com>
wrote:

> Hi Team,
>
>
>
> I am running my beam pipeline using Flink runner and trying to consumer
> side input which are records from RDS.
>
>
>
> 1. Side Input Generation :
>
>
>
> *PCollectionView<JdbcIO.Read<EventSet>> eventSet = pipeline*
>
> *       .apply(GenerateSequence.from(0).withRate(1,
> Duration.standardMinutes(5L)))*
>
> *       .apply(*
>
> *           Window.<Long>into(new GlobalWindows())*
>
> *
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))*
>
> *               .discardingFiredPanes())*
>
> *       .apply("Read from RDS", ParDo.of(new
> DataSource(runtimeConfig.getConfig("db"))))*
>
> *       .apply(View.asSingleton());*
>
>
>
> 2 . Side Input Consumption :
>
>
>
>         pipeline.apply(*"Display Event"*, ParDo.*of*(*new *PrintRecord(eventSet)).withSideInputs(eventSet))
>
>
>
> 3. Accessing records :
>
>
>
>          *class *PrintRecord *extends *DoFn<String, String>{
>
>
>
>   @ProcessElement
> *  public void *processElement(ProcessContext context) {
>               JdbcIO.Read<EventSet> event = context.sideInput(*sideInput*);
>
>             event. ?  // can’t read event object here, it’s a POJO
>
>   }
>
> }
>
>
>
>
>
> Thanks,
>
> Julius
>
>
>
>
>
>
>