You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Lee Seng Cheong <se...@gmail.com> on 2021/09/13 06:35:42 UTC

[Question][Java] - Aggregation Operations on a PeriodicImpulse source does not produce any result

Hi

I am trying out a simple streaming pipeline built on Java and executed on
the DirectRunner.

I was trying to use the PeriodicImpulse class as a streaming source that I
could use immediately, as I previously tried the GenerateSequence class,
but i encountered an issue as raised in: [1]. While the TestStream class is
useful, it does require some setup.

Anyway, the issue I am running into is this: using aggregate
transforms(e.g. GroupByKey()) that are downstream of a PeriodicImpulse
source does not output any elements. I am not sure what I am doing wrong,
but I have tried with GenerateSequence and TestStreams and they were able
to produce output after the aggregation operations such as GroupByKey() and
Count().

Beam Information:
SDK - 2.28.0, 2.32.0
Runner - DirectRunner

Attached below is sample code:

    public static void main(String args[]) {

        //start at 15 seconds later, to ensure no late date
        Instant basetime = Instant.now().plus(Duration.standardSeconds(15));
        Pipeline p = Pipeline.create();

        p.apply("Periodic Impulse",
PeriodicImpulse.create().startAt(basetime).withInterval(Duration.standardSeconds(1)))
                .apply(WithKeys.of("Constant")) //want a constant key, as I
am in only interested in elements belonging to a window

.apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
                .apply(GroupByKey.create())
                .apply("Print grouped elements", ParDo.of(new
DoFn<KV<String, Iterable<Instant>>,KV<String, Iterable<Instant>>>(){

                    //expected this method to be triggered on every
element, but even after a minute there was no output
                    @ProcessElement
                    public void process(@Element
KV<String,Iterable<Instant>> elem,
OutputReceiver<KV<String,Iterable<Instant>>> out) {
                        System.out.println("Received element.");
                        out.output(elem);
                    }
                }));
        p.run().waitUntilFinish();
    }

Links:
[1]
https://stackoverflow.com/questions/66537646/apache-beam-generatesequence-does-not-emit-elements-at-specified-rates

Warm Regards,
Seng Cheong