You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by KristoffSC <kr...@gmail.com> on 2020/01/02 16:30:27 UTC

Late outputs for Session Window

Hi all,
In my pipeline setup I cannot see side outputs for Session Window (Flink
1.9.1)

What I have is:


messageStream.
    .keyBy(tradeKeySelector)
    .window(ProcessingTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
    .sideOutputLateData(lateTradeMessages)
    .process(new CumulativeTransactionOperator())
    .name("Aggregate Transaction Builder");

lateTradeMessages implementes SessionWindowTimeGapExtractor and returns 5
secodns.

Further I have:

messageStream.getSideOutput(lateTradeMessages)
  .keyBy(tradeKeySelector)
  .process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
     @Override
     public void processElement(EnrichedMessage value, Context ctx,
Collector<Transaction> out) throws Exception {
                   System.out.println("Process Late messages For
Aggregation");
                   out.collect(new Transaction());
                }
       })
   .name("Process Late messages For Aggregation");


The problem is that I never see "Process Late messages For Aggregation" when
Im sending Messages with same key. 

When Session Window passes and I "immediately" sent a new message for the
same Key it triggerts new Session Window, without going into Late Side
Output.

Not sure What I'm doing wrong here.

What I would like to achieve heve is to catch "late events" and try to
reprocess them againts state that was builder for "on time" events for this
Window or if its is impossible, report late events into special Sink.

I will appreciate any help.
However it seems I do not see have any late Events.







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Late outputs for Session Window

Posted by KristoffSC <kr...@gmail.com>.
Hi, 
thank you for your SO comment [1]. You are right. Sorry, I miss understand
the "late message" concepts. 
In fact I was never sending "late events" that should match just ended
window.

Thank you for your comments and clarification. 


[1]
https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Late outputs for Session Window

Posted by Arvid Heise <ar...@ververica.com>.
Hi Kristoff,

please check my SO comment and reply.

https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942

It's not entirely clear to me why it's not working but I also don't quite
understand your use case yet (data examples missing).

Best,

Arvid

On Fri, Jan 3, 2020 at 1:03 PM KristoffSC <kr...@gmail.com>
wrote:

> After following suggestion from SO
> I added few changes, so now I'm using Event Time
> Water marks are progressing, I've checked them in Flink's metrics. The
> Window operator is triggered but still I don't see any late outputs for
> this.
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
> 1000));
>         env.setParallelism(1);
>         env.disableOperatorChaining();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.getConfig().setAutoWatermarkInterval(1000);
>
>
> DataStream<RawMessage> rawBusinessTransaction = env
>                 .addSource(new FlinkKafkaConsumer<>("business",
>                         new JSONKeyValueDeserializationSchema(false),
> properties))
>                 .map(new KafkaTransactionObjectMapOperator())
>                 .assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks<RawMessage>() {
>
>                     @Nullable
>                     @Override
>                     public Watermark getCurrentWatermark() {
>                         return new Watermark(System.currentTimeMillis());
>                     }
>
>                     @Override
>                     public long extractTimestamp(RawMessage element, long
> previousElementTimestamp) {
>                         return element.messageCreationTime;
>                     }
>                 })
>                 .name("Kafka Transaction Raw Data Source.");
>
> messageStream
>              .keyBy(tradeKeySelector)
>              .window(EventTimeSessionWindows.withDynamicGap(new
> TradeAggregationGapExtractor()))
>              .sideOutputLateData(lateTradeMessages)
>              .process(new CumulativeTransactionOperator())
>              .name("Aggregate Transaction Builder");
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Late outputs for Session Window

Posted by KristoffSC <kr...@gmail.com>.
After following suggestion from SO
I added few changes, so now I'm using Event Time
Water marks are progressing, I've checked them in Flink's metrics. The
Window operator is triggered but still I don't see any late outputs for
this. 


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
1000));
        env.setParallelism(1);
        env.disableOperatorChaining();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);


DataStream<RawMessage> rawBusinessTransaction = env
                .addSource(new FlinkKafkaConsumer<>("business",
                        new JSONKeyValueDeserializationSchema(false),
properties))
                .map(new KafkaTransactionObjectMapOperator())
                .assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<RawMessage>() {

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(System.currentTimeMillis());
                    }

                    @Override
                    public long extractTimestamp(RawMessage element, long
previousElementTimestamp) {
                        return element.messageCreationTime;
                    }
                })
                .name("Kafka Transaction Raw Data Source.");

messageStream
             .keyBy(tradeKeySelector)
             .window(EventTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
             .sideOutputLateData(lateTradeMessages)
             .process(new CumulativeTransactionOperator())
             .name("Aggregate Transaction Builder");






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/