You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Peter Benedikovic <PB...@zetaglobal.com> on 2020/06/30 15:29:17 UTC

DoFn with OnTimer and watermark

Hi,

I would like to ask you for advice. I am trying to join and deduplicate events from Kafka. Let’s say we have event of three types A, B and C. If B or C arrive first, they have to wait for A until they are emitted downstream. If A does not arrive until some predefined duration (gap), B/C are lost. Here is pseudo-skeleton of implementation:

class JoinFn (gap: Duration) extends DoFn[IN, OUT] {

private final val stateId = "state"
private final val timerStateId = "timerState"
private final val timerId = "expiry"
// state used to keep track of already seen events
@StateId(stateId)
private val stateSpec = StateSpecs.value[State]()

@StateId(timerStateId)
private val timerStateSpec = StateSpecs.value(BooleanCoder.of())

@TimerId(timerId)
private val expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)

override def getAllowedTimestampSkew: Duration = gap

@ProcessElement
def processElement(…): Unit =  {
// emit state downstream/updating state
…
}

@OnTimer(timerId)
def onExpiry(@StateId(stateId) state: ValueState[State],
             @StateId(timerStateId) timerState: ValueState[java.lang.Boolean]): Unit = {
  // just increasing counters of dropped events
…
  state.clear()
  timerState.clear()
}
}

This solution works fine when we process events in realtime (latest offset). When we try to re-process backlog, we see lot of dropped B/C events because of watermark moving fast. So two runs of the same event time domain will yield in different results. I was wondering if there is some better approach that would work in both situations.

Thanks for advice,
Peter




[http://mailsig.zetaglobal.com/zetasig.png]<https://zetaglobal.com/>

Peter Benedikovic
Senior Software Engineer, Big Data

e.  pbenedikovic@zetaglobal.com<ma...@zetaglobal.com>

a. 3 Park Ave, 33rd Floor, New York, NY, 10016
www.zetaglobal.com<http://www.zetaglobal.com>
[http://mailsig.zetaglobal.com/linkedin.png]<https://www.linkedin.com/company/zetaglobal>     [http://mailsig.zetaglobal.com/instagram.png] <https://www.instagram.com/zetaglobal>      [http://mailsig.zetaglobal.com/facebook.png] <https://www.facebook.com/zetaglobal>      [http://mailsig.zetaglobal.com/twitter.png] <https://www.twitter.com/zetaglobal>



This message contains information which may be confidential and privileged. You may not use, copy, or disclose to anyone, the message, or any information contained in this message for purposes not intended by the sender. If you have received the message in error, please advise the sender by reply e-mail, and delete the message .