You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2021/01/27 08:46:00 UTC

[jira] [Commented] (FLINK-15160) Clean up is not applied if there are no incoming events for a key.

    [ https://issues.apache.org/jira/browse/FLINK-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272685#comment-17272685 ] 

Dawid Wysakowicz commented on FLINK-15160:
------------------------------------------

An idea for a fix is we could add a method to the {{TimerService}} for registering timer for timeout:
{code}
public interface TimerService {

    /**
     * Current processing time as returned from {@link org.apache.flink.streaming.api.TimerService}.
     */
    long currentProcessingTime();

   void registerTimeoutTimer(long timeoutTime);
}
{code}

and then in the {{NFA#computeNextState}} use this method in to register a timer for timeout when putting event for a start state:

{code}
                    if (isStartState(computationState)) {
                        startTimestamp = event.getTimestamp();
                        startEventId = event.getEventId();
                        // register timer for timeout in case no more events come for that key
                        timerService.registerTimeoutTimer(startTimestamp + windowTime);
                    } else {
                        startTimestamp = computationState.getStartTimestamp();
                        startEventId = computationState.getStartEventID();
                    }
{code}

> Clean up is not applied if there are no incoming events for a key.
> ------------------------------------------------------------------
>
>                 Key: FLINK-15160
>                 URL: https://issues.apache.org/jira/browse/FLINK-15160
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.8.2, 1.9.1, 1.10.0
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>
> In CepOperator the pruning of timed out partial matches happens along with feeding events into the NFA. Either when unbuffering on Watermark or according to the processing time.
> 1. Processing time
> The state is pruned only with the timestamps of incoming events. If there are no incoming events no pruning happens
> 2. Event time
> It is slightly more complicated, but the outcome is similar. We register timers that pop events from the buffer, but we do not register any timers for when the timeout of a partial match could happen. Therefore if there will be no more events we will never prune matches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)