You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/08/11 06:33:24 UTC

Event time based disconnection detection logic

Hi,
I have a bunch of devices that keep sending heartbeat messages. I want to
make an operator that emits messages when a device disconnects and when a
device stops being disconnected.
A device is considered disconnected if we don't receive any heartbeat for
more than some TIMEOUT duration.
This seemed like a good candidate for session windows, but I am not sure
how I can express the inverse logic (i.e. detecting periods of inactivity
instead of activity) using Flink's operators.
I want to use event time for all processing and ideally want to achieve
this behaviour using a single operator.

So I am trying to implement a custom processfunction that, on every
heartbeat:

   - Deletes any previous event time timer
   - Registers a new timer to fire at heartbeat.timestamp + TIMEOUT

The basic idea is that every new heartbeat will keep pushing the timer
forward. Only when heartbeats stop arriving does the timer fire, indicating
the start of a disconnected state.
Code:

public class IUDisconnectedStateDetector extends
KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> {

    // Tracks if this monitor is disconnected or not.
    private ValueState<Boolean> isDisconnectedStateStore;
    // Tracks which timer was registered.
    private ValueState<Long> registeredTimerStateStore;

    private final Logger LOGGER =
LoggerFactory.getLogger(IUDisconnectedStateDetector.class);

    // Called by the Flink runtime before starting this operator. We
initialize the state stores here.
    @Override
    public void open(Configuration parameters) throws Exception {
        isDisconnectedStateStore = getRuntimeContext().getState(new
ValueStateDescriptor<Boolean>(
                DISCONNECTED_STATE_STORE_NAME, Boolean.class));
        registeredTimerStateStore = getRuntimeContext().getState(new
ValueStateDescriptor<Long>(
                REGISTERED_TIMER_STATE_STORE_NAME, Long.class));
    }

    @Override
    public void processElement(IUHeartbeat heartbeat, Context ctx,
Collector<IUSessionMessage> out) throws Exception {
        Boolean isDisconnected = isDisconnectedStateStore.value();
        LOGGER.info("Watermark: " + heartbeat + ", isDisconnected : "
+ isDisconnected
                +" last registered timer :" +
registeredTimerStateStore.value());


        // If this is the first message for this monitor or is the
first message after a disconnection.
        if (isDisconnected == null || isDisconnected == Boolean.TRUE) {
            // Delete previous timer.
            if (registeredTimerStateStore.value() != null)

ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());

            // Register a timer that will fire in the future if no
further events are received.
            long timerFiringTimestamp = heartbeat.getTimestamp() +
DISCONNECTED_TIMEOUT;
            ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
            registeredTimerStateStore.update(timerFiringTimestamp);

            // Emit a message indicating END of the disconnected state.
            IUSessionMessage message = new IUSessionMessage(
                    new
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),
"dummy", "dummy"),
                    new IUSessionInfo(heartbeat.getTimestamp(),
IUStatus.ENDED, IUEventType.NO_VALUE));
            out.collect(message);
            LOGGER.info(message.getSessionInfo().toString());
            // Update the state store.
            isDisconnectedStateStore.update(Boolean.FALSE);
        }
    }


    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
Collector<IUSessionMessage> out) throws Exception {
        if (isDisconnectedStateStore.value() == Boolean.FALSE) {
            // If this timer fires that means no message was received
from the monitor for some timeout duration.
            // Update the state store.
            isDisconnectedStateStore.update(Boolean.TRUE);

            // Emit a message indicating START of the disconnected
state. Note that since this is applicable for a monitor,
            IUSessionMessage message = new IUSessionMessage(
                    new
IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),
"dummyFeatureName", "dummyDeviceId"),
                    new IUSessionInfo(timestamp, IUStatus.STARTED,
IUEventType.NO_VALUE));
            out.collect(message);

            LOGGER.info(message.getSessionInfo().toString());
        }
    }
}

*However, the above code does not behave as expected - the timer fires even
when (a) it has received heartbeats within the timeout and (b) I have the
code to delete it*. So, my questions:

   - Am I deleting the timer incorrectly? I use a state store to keep track
   of registered timer's timestamps and use that value when deleting.
   - Am I overcomplicating things? Can this be achieved using Flink's
   inbuild session windowing operators?

Thanks!

Re: Event time based disconnection detection logic

Posted by Timo Walther <tw...@apache.org>.
Sometimes it's not easy to spot the obvious ;-)

Great that it works now. Let us know if you have further questions.

Regards,
Timo

On 11.08.20 10:51, Manas Kale wrote:
> Hi Timo,
> I got it, the issue was a (silly) mistake on my part. I unnecessarily 
> put all the processElement() logic inside the if condition. The if() 
> condition is there because I want to emit a disconnected STOPPED message 
> only once.
> So the correct code is :
> 
>    @Override
> public void processElement(IUHeartbeat heartbeat, Context ctx, Collector<IUSessionMessage> out)throws Exception {
>          Boolean isDisconnected =isDisconnectedStateStore.value();
> // LOGGER.info("Watermark: " + ctx.timerService().currentWatermark() + " 
> Processing timestamp : "+ heartbeat.getTimestamp() + ", isDisconnected : 
> " + isDisconnected
> // +" last registered timer :" + registeredTimerStateStore.value());
> 
> // Delete previous timer.
> if (registeredTimerStateStore.value() !=null)
>              ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());
> 
>          // Register a timer that will fire in the future if no further events 
> are received.
> long timerFiringTimestamp = heartbeat.getTimestamp() +DISCONNECTED_TIMEOUT;
>          ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
>          registeredTimerStateStore.update(timerFiringTimestamp);
> 
>          // If this is the first message for this monitor or is the first message 
> after a disconnection.
> if (isDisconnected ==null || isDisconnected == Boolean.TRUE) {
>              // Emit a message indicating END of the disconnected state.
> IUSessionMessage message =new IUSessionMessage(
>                      new IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"),
>                      new IUSessionInfo(heartbeat.getTimestamp(), IUStatus.ENDED, IUEventType.NO_VALUE));
>              out.collect(message);
>              LOGGER.info(message.getSessionInfo().toString());
>          }
> 
>          // Update the state store.
> isDisconnectedStateStore.update(Boolean.FALSE);
>      }
> 
> 
> This produces the expected output.
> Also, I will assume that this is the best way to solve my problem - I 
> can't use Flink's session windows. Let me know if anyone has any other 
> ideas though!
> 
> Thank you for your time and quick response!
> 
> 
> On Tue, Aug 11, 2020 at 1:45 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Manas,
> 
>     at the first glance your code looks correct to me. I would investigate
>     if your keys and watermarks are correct. Esp. the watermark frequency
>     could be an issue. If watermarks are generated at the same time as the
>     heartbeats itself, it might be the case that the timers fire first
>     before the process() function is called which resets the timer.
> 
>     Maybe you can give us more information how watermarks are generated?
> 
>     Regards,
>     Timo
> 
>     On 11.08.20 08:33, Manas Kale wrote:
>      > Hi,
>      > I have a bunch of devices that keep sending heartbeat messages. I
>     want
>      > to make an operator that emits messages when a device disconnects
>     and
>      > when a device stops being disconnected.
>      > A device is considered disconnected if we don't receive any
>     heartbeat
>      > for more than some TIMEOUT duration.
>      > This seemed like a good candidate for session windows, but I am
>     not sure
>      > how I can express the inverse logic (i.e. detecting periods of
>      > inactivity instead of activity) using Flink's operators.
>      > I want to use event time for all processing and ideally want to
>     achieve
>      > this behaviour using a single operator.
>      >
>      > So I am trying to implement a custom processfunction that, on every
>      > heartbeat:
>      >
>      >   * Deletes any previous event time timer
>      >   * Registers a new timer to fire at heartbeat.timestamp + TIMEOUT
>      >
>      > The basic idea is that every new heartbeat will keep pushing the
>     timer
>      > forward. Only when heartbeats stop arriving does the timer fire,
>      > indicating the start of a disconnected state.
>      > Code:
>      >
>      > public class IUDisconnectedStateDetectorextends
>     KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> {
>      >
>      >      // Tracks if this monitor is disconnected or not.
>      > private ValueState<Boolean>isDisconnectedStateStore;
>      >      // Tracks which timer was registered.
>      > private ValueState<Long>registeredTimerStateStore;
>      >
>      >      private final LoggerLOGGER =
>     LoggerFactory.getLogger(IUDisconnectedStateDetector.class);
>      >
>      >      // Called by the Flink runtime before starting this
>     operator. We
>      > initialize the state stores here.
>      > @Override
>      > public void open(Configuration parameters)throws Exception {
>      >          isDisconnectedStateStore =
>     getRuntimeContext().getState(new ValueStateDescriptor<Boolean>(
>      >                  DISCONNECTED_STATE_STORE_NAME, Boolean.class));
>      >          registeredTimerStateStore =
>     getRuntimeContext().getState(new ValueStateDescriptor<Long>(
>      >                  REGISTERED_TIMER_STATE_STORE_NAME, Long.class));
>      >      }
>      >
>      >      @Override
>      > public void processElement(IUHeartbeat heartbeat, Context ctx,
>     Collector<IUSessionMessage> out)throws Exception {
>      >          Boolean isDisconnected =isDisconnectedStateStore.value();
>      >          LOGGER.info("Watermark: " + heartbeat +", isDisconnected
>     : " + isDisconnected
>      >                  +" last registered timer :"
>     +registeredTimerStateStore.value());
>      >
>      >
>      >          // If this is the first message for this monitor or is
>     the first message
>      > after a disconnection.
>      > if (isDisconnected ==null || isDisconnected == Boolean.TRUE) {
>      >              // Delete previous timer.
>      > if (registeredTimerStateStore.value() !=null)
>      >                 
>     ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());
>      >
>      >              // Register a timer that will fire in the future if
>     no further events
>      > are received.
>      > long timerFiringTimestamp = heartbeat.getTimestamp()
>     +DISCONNECTED_TIMEOUT;
>      >             
>     ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
>      >              registeredTimerStateStore.update(timerFiringTimestamp);
>      >
>      >              // Emit a message indicating END of the disconnected
>     state.
>      > IUSessionMessage message =new IUSessionMessage(
>      >                      new
>     IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"),
>      >                      new IUSessionInfo(heartbeat.getTimestamp(),
>     IUStatus.ENDED, IUEventType.NO_VALUE));
>      >              out.collect(message);
>      >              LOGGER.info(message.getSessionInfo().toString());
>      >              // Update the state store.
>      > isDisconnectedStateStore.update(Boolean.FALSE);
>      >          }
>      >      }
>      >
>      >
>      >      @Override
>      > public void onTimer(long timestamp, OnTimerContext ctx,
>     Collector<IUSessionMessage> out)throws Exception {
>      >          if (isDisconnectedStateStore.value() == Boolean.FALSE) {
>      >              // If this timer fires that means no message was
>     received from the
>      > monitor for some timeout duration.
>      > // Update the state store.
>      > isDisconnectedStateStore.update(Boolean.TRUE);
>      >
>      >              // Emit a message indicating START of the
>     disconnected state. Note that
>      > since this is applicable for a monitor,
>      > IUSessionMessage message =new IUSessionMessage(
>      >                      new
>     IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),"dummyFeatureName","dummyDeviceId"),
>      >                      new IUSessionInfo(timestamp,
>     IUStatus.STARTED, IUEventType.NO_VALUE));
>      >              out.collect(message);
>      >
>      >              LOGGER.info(message.getSessionInfo().toString());
>      >          }
>      >      }
>      > }
>      >
>      > *However, the above code does not behave as expected - the timer
>     fires
>      > even when (a) it has received heartbeats within the timeout and
>     (b) I
>      > have the code to delete it*. So, my questions:
>      >
>      >   * Am I deleting the timer incorrectly? I use a state store to keep
>      >     track of registered timer's timestamps and use that value
>     when deleting.
>      >   * Am I overcomplicating things? Can this be achieved using Flink's
>      >     inbuild session windowing operators?
>      >
>      > Thanks!
> 


Re: Event time based disconnection detection logic

Posted by Manas Kale <ma...@gmail.com>.
Hi Timo,
I got it, the issue was a (silly) mistake on my part. I unnecessarily put
all the processElement() logic inside the if condition. The if() condition
is there because I want to emit a disconnected STOPPED message only once.
So the correct code is :

  @Override
    public void processElement(IUHeartbeat heartbeat, Context ctx,
Collector<IUSessionMessage> out) throws Exception {
        Boolean isDisconnected = isDisconnectedStateStore.value();
//        LOGGER.info("Watermark: " +
ctx.timerService().currentWatermark() + " Processing timestamp : "+
heartbeat.getTimestamp() + ", isDisconnected : " + isDisconnected
//                +" last registered timer :" +
registeredTimerStateStore.value());

        // Delete previous timer.
        if (registeredTimerStateStore.value() != null)
            ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());

        // Register a timer that will fire in the future if no further
events are received.
        long timerFiringTimestamp = heartbeat.getTimestamp() +
DISCONNECTED_TIMEOUT;
        ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
        registeredTimerStateStore.update(timerFiringTimestamp);

        // If this is the first message for this monitor or is the
first message after a disconnection.
        if (isDisconnected == null || isDisconnected == Boolean.TRUE) {
            // Emit a message indicating END of the disconnected state.
            IUSessionMessage message = new IUSessionMessage(
                    new
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),
"dummy", "dummy"),
                    new IUSessionInfo(heartbeat.getTimestamp(),
IUStatus.ENDED, IUEventType.NO_VALUE));
            out.collect(message);
            LOGGER.info(message.getSessionInfo().toString());
        }

        // Update the state store.
        isDisconnectedStateStore.update(Boolean.FALSE);
    }


This produces the expected output.
Also, I will assume that this is the best way to solve my problem - I can't
use Flink's session windows. Let me know if anyone has any other ideas
though!

Thank you for your time and quick response!


On Tue, Aug 11, 2020 at 1:45 PM Timo Walther <tw...@apache.org> wrote:

> Hi Manas,
>
> at the first glance your code looks correct to me. I would investigate
> if your keys and watermarks are correct. Esp. the watermark frequency
> could be an issue. If watermarks are generated at the same time as the
> heartbeats itself, it might be the case that the timers fire first
> before the process() function is called which resets the timer.
>
> Maybe you can give us more information how watermarks are generated?
>
> Regards,
> Timo
>
> On 11.08.20 08:33, Manas Kale wrote:
> > Hi,
> > I have a bunch of devices that keep sending heartbeat messages. I want
> > to make an operator that emits messages when a device disconnects and
> > when a device stops being disconnected.
> > A device is considered disconnected if we don't receive any heartbeat
> > for more than some TIMEOUT duration.
> > This seemed like a good candidate for session windows, but I am not sure
> > how I can express the inverse logic (i.e. detecting periods of
> > inactivity instead of activity) using Flink's operators.
> > I want to use event time for all processing and ideally want to achieve
> > this behaviour using a single operator.
> >
> > So I am trying to implement a custom processfunction that, on every
> > heartbeat:
> >
> >   * Deletes any previous event time timer
> >   * Registers a new timer to fire at heartbeat.timestamp + TIMEOUT
> >
> > The basic idea is that every new heartbeat will keep pushing the timer
> > forward. Only when heartbeats stop arriving does the timer fire,
> > indicating the start of a disconnected state.
> > Code:
> >
> > public class IUDisconnectedStateDetectorextends
> KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> {
> >
> >      // Tracks if this monitor is disconnected or not.
> > private ValueState<Boolean>isDisconnectedStateStore;
> >      // Tracks which timer was registered.
> > private ValueState<Long>registeredTimerStateStore;
> >
> >      private final LoggerLOGGER =
> LoggerFactory.getLogger(IUDisconnectedStateDetector.class);
> >
> >      // Called by the Flink runtime before starting this operator. We
> > initialize the state stores here.
> > @Override
> > public void open(Configuration parameters)throws Exception {
> >          isDisconnectedStateStore = getRuntimeContext().getState(new
> ValueStateDescriptor<Boolean>(
> >                  DISCONNECTED_STATE_STORE_NAME, Boolean.class));
> >          registeredTimerStateStore = getRuntimeContext().getState(new
> ValueStateDescriptor<Long>(
> >                  REGISTERED_TIMER_STATE_STORE_NAME, Long.class));
> >      }
> >
> >      @Override
> > public void processElement(IUHeartbeat heartbeat, Context ctx,
> Collector<IUSessionMessage> out)throws Exception {
> >          Boolean isDisconnected =isDisconnectedStateStore.value();
> >          LOGGER.info("Watermark: " + heartbeat +", isDisconnected : " +
> isDisconnected
> >                  +" last registered timer :"
> +registeredTimerStateStore.value());
> >
> >
> >          // If this is the first message for this monitor or is the
> first message
> > after a disconnection.
> > if (isDisconnected ==null || isDisconnected == Boolean.TRUE) {
> >              // Delete previous timer.
> > if (registeredTimerStateStore.value() !=null)
> >
> ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());
> >
> >              // Register a timer that will fire in the future if no
> further events
> > are received.
> > long timerFiringTimestamp = heartbeat.getTimestamp()
> +DISCONNECTED_TIMEOUT;
> >
> ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
> >              registeredTimerStateStore.update(timerFiringTimestamp);
> >
> >              // Emit a message indicating END of the disconnected state.
> > IUSessionMessage message =new IUSessionMessage(
> >                      new
> IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"),
> >                      new IUSessionInfo(heartbeat.getTimestamp(),
> IUStatus.ENDED, IUEventType.NO_VALUE));
> >              out.collect(message);
> >              LOGGER.info(message.getSessionInfo().toString());
> >              // Update the state store.
> > isDisconnectedStateStore.update(Boolean.FALSE);
> >          }
> >      }
> >
> >
> >      @Override
> > public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<IUSessionMessage> out)throws Exception {
> >          if (isDisconnectedStateStore.value() == Boolean.FALSE) {
> >              // If this timer fires that means no message was received
> from the
> > monitor for some timeout duration.
> > // Update the state store.
> > isDisconnectedStateStore.update(Boolean.TRUE);
> >
> >              // Emit a message indicating START of the disconnected
> state. Note that
> > since this is applicable for a monitor,
> > IUSessionMessage message =new IUSessionMessage(
> >                      new
> IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),"dummyFeatureName","dummyDeviceId"),
> >                      new IUSessionInfo(timestamp, IUStatus.STARTED,
> IUEventType.NO_VALUE));
> >              out.collect(message);
> >
> >              LOGGER.info(message.getSessionInfo().toString());
> >          }
> >      }
> > }
> >
> > *However, the above code does not behave as expected - the timer fires
> > even when (a) it has received heartbeats within the timeout and (b) I
> > have the code to delete it*. So, my questions:
> >
> >   * Am I deleting the timer incorrectly? I use a state store to keep
> >     track of registered timer's timestamps and use that value when
> deleting.
> >   * Am I overcomplicating things? Can this be achieved using Flink's
> >     inbuild session windowing operators?
> >
> > Thanks!
>
>

Re: Event time based disconnection detection logic

Posted by Timo Walther <tw...@apache.org>.
Hi Manas,

at the first glance your code looks correct to me. I would investigate 
if your keys and watermarks are correct. Esp. the watermark frequency 
could be an issue. If watermarks are generated at the same time as the 
heartbeats itself, it might be the case that the timers fire first 
before the process() function is called which resets the timer.

Maybe you can give us more information how watermarks are generated?

Regards,
Timo

On 11.08.20 08:33, Manas Kale wrote:
> Hi,
> I have a bunch of devices that keep sending heartbeat messages. I want 
> to make an operator that emits messages when a device disconnects and 
> when a device stops being disconnected.
> A device is considered disconnected if we don't receive any heartbeat 
> for more than some TIMEOUT duration.
> This seemed like a good candidate for session windows, but I am not sure 
> how I can express the inverse logic (i.e. detecting periods of 
> inactivity instead of activity) using Flink's operators.
> I want to use event time for all processing and ideally want to achieve 
> this behaviour using a single operator.
> 
> So I am trying to implement a custom processfunction that, on every 
> heartbeat:
> 
>   * Deletes any previous event time timer
>   * Registers a new timer to fire at heartbeat.timestamp + TIMEOUT
> 
> The basic idea is that every new heartbeat will keep pushing the timer 
> forward. Only when heartbeats stop arriving does the timer fire, 
> indicating the start of a disconnected state.
> Code:
> 
> public class IUDisconnectedStateDetectorextends KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> {
> 
>      // Tracks if this monitor is disconnected or not.
> private ValueState<Boolean>isDisconnectedStateStore;
>      // Tracks which timer was registered.
> private ValueState<Long>registeredTimerStateStore;
> 
>      private final LoggerLOGGER = LoggerFactory.getLogger(IUDisconnectedStateDetector.class);
> 
>      // Called by the Flink runtime before starting this operator. We 
> initialize the state stores here.
> @Override
> public void open(Configuration parameters)throws Exception {
>          isDisconnectedStateStore = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>(
>                  DISCONNECTED_STATE_STORE_NAME, Boolean.class));
>          registeredTimerStateStore = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
>                  REGISTERED_TIMER_STATE_STORE_NAME, Long.class));
>      }
> 
>      @Override
> public void processElement(IUHeartbeat heartbeat, Context ctx, Collector<IUSessionMessage> out)throws Exception {
>          Boolean isDisconnected =isDisconnectedStateStore.value();
>          LOGGER.info("Watermark: " + heartbeat +", isDisconnected : " + isDisconnected
>                  +" last registered timer :" +registeredTimerStateStore.value());
> 
> 
>          // If this is the first message for this monitor or is the first message 
> after a disconnection.
> if (isDisconnected ==null || isDisconnected == Boolean.TRUE) {
>              // Delete previous timer.
> if (registeredTimerStateStore.value() !=null)
>                  ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());
> 
>              // Register a timer that will fire in the future if no further events 
> are received.
> long timerFiringTimestamp = heartbeat.getTimestamp() +DISCONNECTED_TIMEOUT;
>              ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
>              registeredTimerStateStore.update(timerFiringTimestamp);
> 
>              // Emit a message indicating END of the disconnected state.
> IUSessionMessage message =new IUSessionMessage(
>                      new IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"),
>                      new IUSessionInfo(heartbeat.getTimestamp(), IUStatus.ENDED, IUEventType.NO_VALUE));
>              out.collect(message);
>              LOGGER.info(message.getSessionInfo().toString());
>              // Update the state store.
> isDisconnectedStateStore.update(Boolean.FALSE);
>          }
>      }
> 
> 
>      @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<IUSessionMessage> out)throws Exception {
>          if (isDisconnectedStateStore.value() == Boolean.FALSE) {
>              // If this timer fires that means no message was received from the 
> monitor for some timeout duration.
> // Update the state store.
> isDisconnectedStateStore.update(Boolean.TRUE);
> 
>              // Emit a message indicating START of the disconnected state. Note that 
> since this is applicable for a monitor,
> IUSessionMessage message =new IUSessionMessage(
>                      new IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),"dummyFeatureName","dummyDeviceId"),
>                      new IUSessionInfo(timestamp, IUStatus.STARTED, IUEventType.NO_VALUE));
>              out.collect(message);
> 
>              LOGGER.info(message.getSessionInfo().toString());
>          }
>      }
> }
> 
> *However, the above code does not behave as expected - the timer fires 
> even when (a) it has received heartbeats within the timeout and (b) I 
> have the code to delete it*. So, my questions:
> 
>   * Am I deleting the timer incorrectly? I use a state store to keep
>     track of registered timer's timestamps and use that value when deleting.
>   * Am I overcomplicating things? Can this be achieved using Flink's
>     inbuild session windowing operators?
> 
> Thanks!