You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Praveen K Viswanathan <ha...@gmail.com> on 2020/06/29 01:23:47 UTC

DoFn with SideInput

Hi All - I am facing an issue while using *side-input*.

*What am I doing:*
From my main program, I am calling a custom PTransform with a
PCollectionView as parameter. Inside custom PTransform, I am passing the
PCollectionView as a side-input to a DoFn.

*Issue:*
When I run the pipeline, I am expecting the log statement inside my DoFn's
processElement to get executed but it is not getting logged. If I remove
the side-input to my DoFn then the log is getting printed. I am suspecting
whether it could be related to windowing/execution order or my side-input
somehow being empty. Appreciate if you can clarify on what is going wrong
here.

*Code Structure:*


*Main Program:* PCollectionTuple tuple = input.apply(new FirstTx());

 // Get two tuple tags from first transformation
 PCollection1 = tuple.get(tag1).setCoder(...);
 PCollection2 = tuple.get(tag2).setCoder(...);

 // Converting PCollection1 to PCollectionView to use as a side-input
 // Note: I need to introduce a global window here as my source is
unbounded and when we use View.asList() it does GroupByKey internally
          which inturn demands a window
 PView = PCollection1.apply(Window.<KV<String, CustomObject>>into(new
GlobalWindows()) // Everything into global window.

   .triggering(Repeatedly.forever(DefaultTrigger.of()))

   .discardingFiredPanes()).apply(Values.create()).apply(View.asList());

// Pass PCollectionView to SecondTx as a param
PCollection3 = PCollection2.apply(new SecondTx(PView));

*SecondTx:*
Inside my SecondTx, I am getting the PView from constructor (this.PView =
PView) and calling a DoFn

public PCollection<CustomObject> expand(PCollection <KV <String, KV
<String, CustomObject>>> input) {
input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView));
...
}

// DoFn
class UpdateFn extends DoFn<Map<String, Map<String, Map<String, String>>>,
CustomObject> {
    @ProcessElement
    public void processElement(@Element Map<String, Map<String, Map<String,
String>>> input, OutputReceiver<CustomObject> out) {
       * Log.of("UpdateFn " + input);*
        out.output(new CustomObject());
    }
}

-- 
Thanks,
Praveen K Viswanathan

Re: DoFn with SideInput

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Thank you Luke. I changed DefaultTrigger.of() to AfterProcessingTime.
pastFirstElementInPane() and it worked.

On Mon, Jun 29, 2020 at 9:09 AM Luke Cwik <lc...@google.com> wrote:

> The UpdateFn won't be invoked till the side input is ready which requires
> either the watermark to pass the end of the global window + allowed
> lateness (to show that the side input is empty) or at least one firing to
> populate it with data. See this general section on side inputs[1] and some
> useful patterns[2] (there are some examples for how to get globally
> windowed side inputs to work).
>
> 1: https://beam.apache.org/documentation/programming-guide/#side-inputs
> 2: https://beam.apache.org/documentation/patterns/side-inputs/
>
> On Sun, Jun 28, 2020 at 6:24 PM Praveen K Viswanathan <
> harish.praveen@gmail.com> wrote:
>
>>
>> Hi All - I am facing an issue while using *side-input*.
>>
>> *What am I doing:*
>> From my main program, I am calling a custom PTransform with a
>> PCollectionView as parameter. Inside custom PTransform, I am passing the
>> PCollectionView as a side-input to a DoFn.
>>
>> *Issue:*
>> When I run the pipeline, I am expecting the log statement inside my
>> DoFn's processElement to get executed but it is not getting logged. If I
>> remove the side-input to my DoFn then the log is getting printed. I am
>> suspecting whether it could be related to windowing/execution order or my
>> side-input somehow being empty. Appreciate if you can clarify on what is
>> going wrong here.
>>
>> *Code Structure:*
>>
>>
>> *Main Program:* PCollectionTuple tuple = input.apply(new FirstTx());
>>
>>  // Get two tuple tags from first transformation
>>  PCollection1 = tuple.get(tag1).setCoder(...);
>>  PCollection2 = tuple.get(tag2).setCoder(...);
>>
>>  // Converting PCollection1 to PCollectionView to use as a side-input
>>  // Note: I need to introduce a global window here as my source is
>> unbounded and when we use View.asList() it does GroupByKey internally
>>           which inturn demands a window
>>  PView = PCollection1.apply(Window.<KV<String, CustomObject>>into(new
>> GlobalWindows()) // Everything into global window.
>>
>>      .triggering(Repeatedly.forever(DefaultTrigger.of()))
>>
>>      .discardingFiredPanes()).apply(Values.create()).apply(View.asList());
>>
>> // Pass PCollectionView to SecondTx as a param
>> PCollection3 = PCollection2.apply(new SecondTx(PView));
>>
>> *SecondTx:*
>> Inside my SecondTx, I am getting the PView from constructor (this.PView =
>> PView) and calling a DoFn
>>
>> public PCollection<CustomObject> expand(PCollection <KV <String, KV
>> <String, CustomObject>>> input) {
>> input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView));
>> ...
>> }
>>
>> // DoFn
>> class UpdateFn extends DoFn<Map<String, Map<String, Map<String,
>> String>>>, CustomObject> {
>>     @ProcessElement
>>     public void processElement(@Element Map<String, Map<String,
>> Map<String, String>>> input, OutputReceiver<CustomObject> out) {
>>        * Log.of("UpdateFn " + input);*
>>         out.output(new CustomObject());
>>     }
>> }
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan

Re: DoFn with SideInput

Posted by Luke Cwik <lc...@google.com>.
The UpdateFn won't be invoked till the side input is ready which requires
either the watermark to pass the end of the global window + allowed
lateness (to show that the side input is empty) or at least one firing to
populate it with data. See this general section on side inputs[1] and some
useful patterns[2] (there are some examples for how to get globally
windowed side inputs to work).

1: https://beam.apache.org/documentation/programming-guide/#side-inputs
2: https://beam.apache.org/documentation/patterns/side-inputs/

On Sun, Jun 28, 2020 at 6:24 PM Praveen K Viswanathan <
harish.praveen@gmail.com> wrote:

>
> Hi All - I am facing an issue while using *side-input*.
>
> *What am I doing:*
> From my main program, I am calling a custom PTransform with a
> PCollectionView as parameter. Inside custom PTransform, I am passing the
> PCollectionView as a side-input to a DoFn.
>
> *Issue:*
> When I run the pipeline, I am expecting the log statement inside my DoFn's
> processElement to get executed but it is not getting logged. If I remove
> the side-input to my DoFn then the log is getting printed. I am suspecting
> whether it could be related to windowing/execution order or my side-input
> somehow being empty. Appreciate if you can clarify on what is going wrong
> here.
>
> *Code Structure:*
>
>
> *Main Program:* PCollectionTuple tuple = input.apply(new FirstTx());
>
>  // Get two tuple tags from first transformation
>  PCollection1 = tuple.get(tag1).setCoder(...);
>  PCollection2 = tuple.get(tag2).setCoder(...);
>
>  // Converting PCollection1 to PCollectionView to use as a side-input
>  // Note: I need to introduce a global window here as my source is
> unbounded and when we use View.asList() it does GroupByKey internally
>           which inturn demands a window
>  PView = PCollection1.apply(Window.<KV<String, CustomObject>>into(new
> GlobalWindows()) // Everything into global window.
>
>      .triggering(Repeatedly.forever(DefaultTrigger.of()))
>
>      .discardingFiredPanes()).apply(Values.create()).apply(View.asList());
>
> // Pass PCollectionView to SecondTx as a param
> PCollection3 = PCollection2.apply(new SecondTx(PView));
>
> *SecondTx:*
> Inside my SecondTx, I am getting the PView from constructor (this.PView =
> PView) and calling a DoFn
>
> public PCollection<CustomObject> expand(PCollection <KV <String, KV
> <String, CustomObject>>> input) {
> input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView));
> ...
> }
>
> // DoFn
> class UpdateFn extends DoFn<Map<String, Map<String, Map<String, String>>>,
> CustomObject> {
>     @ProcessElement
>     public void processElement(@Element Map<String, Map<String,
> Map<String, String>>> input, OutputReceiver<CustomObject> out) {
>        * Log.of("UpdateFn " + input);*
>         out.output(new CustomObject());
>     }
> }
>
> --
> Thanks,
> Praveen K Viswanathan
>