You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexis Sarda-Espinosa <al...@microfocus.com> on 2022/02/02 11:32:22 UTC

RE: Determinism of interval joins

Well, for those who might be interested in the semantics I mentioned, I implemented a custom operator that seems to achieve what I want by mostly ignoring the actual timestamps from the side stream's watermarks. However, it kind of depends on the fact that my main stream comes from a previous window and is watermarked with "windowEnd - 1" (thus "timestamp1 + 1" below).

public class PrioritizedWatermarkStreamIntervalJoinOperator extends IntervalJoinOperator<...> {
    private static final long serialVersionUID = 1L;

    private long maxTimestamp1 = Long.MIN_VALUE;
    private long maxTimestamp2 = Long.MIN_VALUE;

    public PrioritizedWatermarkStreamIntervalJoinOperator(...) {
        super(...);
    }

    @Override
    public void processWatermark1(Watermark mark) throws Exception {
        if (mark.getTimestamp() > maxTimestamp1) {
            maxTimestamp1 = mark.getTimestamp();
        }
        super.processWatermark1(mark);
        maybeProcessWatermark2(mark, mark.getTimestamp(), maxTimestamp2);
    }

    private void maybeProcessWatermark2(Watermark mark, long timestamp1, long maxTimestampForComparison) throws Exception {
        if (mark.equals(Watermark.MAX_WATERMARK) && maxTimestampForComparison == Watermark.MAX_WATERMARK.getTimestamp()) {
            super.processWatermark2(Watermark.MAX_WATERMARK);
        } else if (maxTimestamp2 > maxTimestamp1) {
            if (timestamp1 == Long.MAX_VALUE) {
                LOG.warn("Trying to bump timestamp1 would result in overflow, skipping.");
                return;
            }
            super.processWatermark2(new Watermark(timestamp1 + 1L));
        }
    }

    @Override
    public void processWatermark2(Watermark mark) throws Exception {
        if (mark.getTimestamp() > maxTimestamp2) {
            maxTimestamp2 = mark.getTimestamp();
        }
        maybeProcessWatermark2(mark, maxTimestamp1, maxTimestamp1);
    }
}

Regards,
Alexis.

From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Samstag, 29. Januar 2022 13:47
To: Robert Metzger <me...@gmail.com>
Cc: user@flink.apache.org
Subject: RE: Determinism of interval joins

I think I spoke to soon when I said my watermark strategies were like the included ones. My generators mark themselves as idle when they start, and stay like that as long as they don't seen any event at all. In the tests, I presume a variable number of events (and watermarks) from stream1 were consumed before anything from stream2 was, so by the time stream2 emitted a watermark to mark itself as not idle, it was already too late, and everything was dropped; I debugged some of the operators and could see that a lot of inputs were considered late since they were processed when the internal watermark service already had Long.MAX_VALUE as current watermark. If I change this idleness behavior, I do see changes in the test's output.

When running in real-time, I definitely need to mark some streams as idle after some time because I don't expect all of them to receive data constantly. However, the non-real-time scenario is also relevant for me, and not just for testing, if something crashes in the system and suddently the pipeline needs to process backlog, it would be nice if semantics were well defined. Ideally, this would mean, for two-input operators in general I imagine, that when an operator knows that all streams from one input have passed a certain watermark (based on slide/tumble time), it would switch and consume from the other stream to check whether it's idle or not. I suppose this wouldn't be a definite guarantee either since the data from the different streams may take some time to reach the different operators (latency and whatnot), but it would still be useful.

I imagine the details are more complex and I'm oversimplifying a bit (I don't know how the network stack works), but I would think this kind of semantics are commonly expected when handling multiple streams that need joins and so on. What do you think?

Regards,
Alexis.

From: Robert Metzger <me...@gmail.com>>
Sent: Freitag, 28. Januar 2022 14:49
To: Alexis Sarda-Espinosa <al...@microfocus.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Determinism of interval joins

Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if the behavior gets deterministic?

On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa <al...@microfocus.com>> wrote:
I'm not sure if the issue in [1] is relevant since it mentions the Table API, but it could be. Since stream1 and stream2 in my example have a long chain of operators behind, I presume they might "run" at very different paces.

Oh and, in the context of my unit tests, watermarks should be deterministic, the input file is sorted, and the watermark strategies should essentially behave like the monotonous generator.

[1] https://issues.apache.org/jira/browse/FLINK-24466

Regards,
Alexis.

________________________________
From: Alexis Sarda-Espinosa <al...@microfocus.com>>
Sent: Thursday, January 27, 2022 1:30 PM
To: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Determinism of interval joins


Hi everyone,



I'm seeing a lack of determinism in unit tests when using an interval join. I am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits of my pipeline look a bit like this:



keySelector1 = ...

keySelector2 = ...



rightStream = stream1

  .flatMap(...)

  .keyBy(keySelector1)

  .assignTimestampsAndWatermarks(strategy1)



leftStream = stream2

  .keyBy(keySelector2)

  .assignTimestampsAndWatermarks(strategy2)



joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream, keySelector2)

  .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream, keySelector1))

  .between(Time.minutes(-10L), Time.milliseconds(0L))

  .lowerBoundExclusive()

  .process(new IntervalJoinFunction(...))



---



In my tests, I have a bounded source that loads demo data from a file and simulates the stream with a sink that collects results in memory. In the specific case of my IntervalJoinFunction, I'm seeing that it's called a different amount of times in a non-deterministic way, sometimes I see 14 calls to its processElement() method, others 8, others none at all and my output is empty; I count this by checking my logs with some tracing.



Does anyone know why this is? Maybe I'm doing something wrong, particularly with reinterpretAsKeyedStream.



Regards,

Alexis.