You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rion Williams <ri...@gmail.com> on 2021/03/01 17:55:55 UTC

Re: Handling Data Separation / Watermarking from Kafka in Flink

Hey David et all,

I had one follow up question for this as I've been putting together some
integration/unit tests to verify that things are working as expected with
finite datasets (e.g. a text file with several hundred records that are
serialized, injected into Kafka, and processed through the pipeline). I'm
wondering if there's a good strategy to handle these finite sets (i.e. when
I'm done reading through all of the records that I care about, I'd need to
trigger something to explicitly flush the windows / evict messages. I'm not
sure what a great approach would be to handle here? I don't think there's
an easy way to simulate processing time delays outside of an explicit
Thread.sleep() call prior to injecting some messages into the running
pipeline asynchronously.

Any recommendations for handling something like this? I must imagine that
it's a fairly common use-case for testing, but maybe not?

Thanks much!

Rion

On Sat, Feb 27, 2021 at 10:56 AM Rion Williams <ri...@gmail.com>
wrote:

> Thanks David,
>
> I figured that the correct approach would obviously be to adopt a keying
> strategy upstream to ensure the same data that I used as a key downstream
> fell on the same partition (ensuring the ordering guarantees I’m looking
> for).
>
> I’m guessing implementation-wise, when I would normally evict a window
> after some event time and allowed lateness, I could set a timer or just
> explicitly keep the window open for some additional time to allow for out
> of order data to make its way into the window.
>
> Either way - I think the keying is probably the right approach, but I
> wanted to consider any other options should that become an issue upstream.
>
> Thanks!
>
> Rion
>
> On Feb 27, 2021, at 10:21 AM, David Anderson <da...@apache.org> wrote:
>
> 
> Rion,
>
> If you can arrange for each tenant's events to be in only one kafka
> partition, that should be the best way to simplify the processing you need
> to do. Otherwise, a simple change that may help would be to increase the
> bounded delay you use in calculating your own per-tenant watermarks,
> thereby making late events less likely.
>
> David
>
> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams <ri...@gmail.com>
> wrote:
>
>> David and Timo,
>>
>> Firstly, thank you both so much for your contributions and advice. I
>> believe I’ve implemented things along the lines that you both detailed and
>> things appear to work just as expected (e.g. I can see things arriving,
>> being added to windows, discarding late records, and ultimately writing out
>> files as expected).
>>
>> With that said, I have one question / issue that I’ve run into with
>> handling the data coming my Kafka topic. Currently, my tenant/source (i.e.
>> my key) may be distributed across the 10 partitions of my Kafka topic. With
>> the way that I’m consuming from this topic (with a Kafka Consumer), it
>> looks like my data is arriving in a mixed order which seems to be causing
>> my own watermarks (those stored in my ValueState) to process as later data
>> may arrive earlier than other data and cause my windows to be evicted.
>>
>> I’m currently using the `withNoWatermarks()` along with a custom
>> timestamp assigned to handle all of my timestamping, but is there a
>> mechanism to handle the mixed ordering across partitions in this scenario
>> at the Flink level?
>>
>> I know the answer here likely lies with Kafka and adopting a better
>> keying strategy to ensure the same tenant/source (my key) lands on the same
>> partition, which by definition ensures ordering. I’m just wondering if
>> there’s some mechanism to accomplish this post-reading from Kafka in Flink
>> within my pipeline to handle things in a similar fashion?
>>
>> Again - thank you both so much, I’m loving the granularity and control
>> that Flink has been providing me over other streaming technologies I’ve
>> used in the past. I’m totally sold on it and am looking forward to doing
>> more incredible things with it.
>>
>> Best regards,
>>
>> Rion
>>
>> On Feb 26, 2021, at 4:36 AM, David Anderson <da...@apache.org> wrote:
>>
>> 
>> Yes indeed, Timo is correct -- I am proposing that you not use timers at
>> all. Watermarks and event-time timers go hand in hand -- and neither
>> mechanism can satisfy your requirements.
>>
>> You can instead put all of the timing logic in the processElement method
>> -- effectively emulating what you would get if Flink were to offer per-key
>> watermarking.
>>
>> The reason that the PseudoWindow example is using MapState is that for
>> each key/tenant, more than one window can be active simultaneously. This
>> occurs because the event stream is out-of-order with respect to time, so
>> events for the "next window" are probably being processed before "the
>> previous" window is complete. And if you want to accommodate allowed
>> lateness, the requirement to have several windows open at once becomes even
>> more important.
>>
>> MapState gives you a per-tenant hashmap, where each entry in that map
>> corresponds to an open window for some particular tenant, where the map's
>> key is the timestamp for a window, and the value is whatever state you want
>> that window to hold.
>>
>> Best regards,
>> David
>>
>>
>>
>>
>> On Fri, Feb 26, 2021 at 9:44 AM Timo Walther <tw...@apache.org> wrote:
>>
>>> Hi Rion,
>>>
>>> I think what David was refering to is that you do the entire time
>>> handling yourself in process function. That means not using the
>>> `context.timerService()` or `onTimer()` that Flink provides but calling
>>> your own logic based on the timestamps that enter your process function
>>> and the stored state.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 26.02.21 00:29, Rion Williams wrote:
>>> > 
>>> > Hi David,
>>> >
>>> > Thanks for your prompt reply, it was very helpful and the PseudoWindow
>>> > example is excellent. I believe it closely aligns with an approach
>>> that
>>> > I was tinkering with but seemed to be missing a few key pieces. In my
>>> > case, I'm essentially going to want to be aggregating the messages
>>> that
>>> > are coming into the window (a simple string-concatenation aggregation
>>> > would work). Would I need another form of state to hold that, as
>>> looking
>>> > through this example with naive eyes, it seems that this function is
>>> > currently storing multiple windows in state via the MapState provided:
>>> >
>>> > // Keyed, managed state, with an entry for each window, keyed by the
>>> > window's end time.
>>> > // There is a separate MapState object for each driver.
>>> > private transient MapState<Long, Float> sumOfTips;
>>> >
>>> > If I wanted to perform an aggregation for each key/tenant, would a
>>> > MapState be appropriate? Such as a MapState<Long, String> if I was
>>> doing
>>> > a string aggregation, so that within my processElement function I
>>> could
>>> > use something similar for building these aggregations and ultimately
>>> > triggering them:
>>> >
>>> > // Keep track of a tenant/source specific watermark
>>> > private lateinit var currentWatermark: ValueState<Long>
>>> > // Keep track of the contents of each of the windows where the key
>>> > represents the close
>>> > // of the window and the contents represents an accumulation of the
>>> > records for that window
>>> > private lateinit var windowContents: MapState<Long, String>
>>> >
>>> > If that's the case, this is what I've thrown together thus far and I
>>> > feel like it's moving in the right direction:
>>> >
>>> > class MagicWindow(private val duration: Long, private val lateness:
>>> Long):
>>> >      KeyedProcessFunction<String, Event, FileOutput>(){
>>> >
>>> >      // Keep track of a tenant/source specific watermark
>>> >      private lateinit var currentWatermark: ValueState<Long>
>>> >      // Keep track of the contents of each of the windows where the
>>> key
>>> > represents the close
>>> >      // of the window and the contents represents an accumulation of
>>> the
>>> > records for that window
>>> >      private lateinit var windowContents: MapState<Long, String>
>>> >
>>> >      override fun open(config: Configuration) {
>>> >          currentWatermark = runtimeContext.getState(watermark)
>>> >          currentWatermark.update(Long.MIN_VALUE)
>>> >      }
>>> >
>>> >      override fun processElement(element: Event, context: Context,
>>> out:
>>> > Collector<FileOutput>) {
>>> >          // Resolve the event time
>>> >          val eventTime: Long = getEventTime(element)
>>> >
>>> >          // Update watermark (if applicable)
>>> >          if (currentWatermark.value() < eventTime){
>>> >              currentWatermark.update(eventTime)
>>> >          }
>>> >
>>> >          // Define a timer for this window
>>> >          val timerService = context.timerService()
>>> >
>>> >          if (eventTime <= timerService.currentWatermark()) {
>>> >              // This event is late; its window has already been
>>> triggered.
>>> >          } else {
>>> >              // Determine the "actual" window closure and start a
>>> timer
>>> > for it
>>> >              // (eventTime + window
>>> >              val endOfWindow= eventTime - (eventTime % duration) +
>>> > duration - 1
>>> >
>>> >              // Schedule a callback for when the window has been
>>> completed.
>>> >              timerService.registerEventTimeTimer(endOfWindow)
>>> >
>>> >              // Add this element to the corresponding aggregation for
>>> > this window
>>> >              windowContents.put(endOfWindow,
>>> windowContents[endOfWindow]
>>> > + "$element")
>>> >          }
>>> >      }
>>> >
>>> >      override fun onTimer(timestamp: Long, context: OnTimerContext,
>>> out:
>>> > Collector<FileOutput>) {
>>> >          val key = context.currentKey
>>> >          val currentAggregation: String = windowContents.get(timestamp)
>>> >
>>> >          // Output things here and clear the current aggregation for
>>> this
>>> >          // tenant/source combination in this window
>>> >      }
>>> >
>>> >      companion object {
>>> >          private val watermark = ValueStateDescriptor(
>>> >              "watermark",
>>> >              Long::class.java
>>> >          )
>>> >
>>> >          private val windowContents = MapStateDescriptor(
>>> >              "window-contents",
>>> >              Long::class.java,
>>> >              String::class.java
>>> >          )
>>> >
>>> >          fun getEventTime(element: Event): Long {
>>> >              return Instant(element.`source$1`.createdTimestamp).millis
>>> >          }
>>> >      }
>>> > }
>>> >
>>> > Is something glaringly off with this? I’ll need to do some
>>> additionally
>>> > reading on the timers, but any additional clarification would be
>>> greatly
>>> > appreciated.
>>> >
>>> > Thanks so much for your initial response again!
>>> >
>>> > Rion
>>> >
>>> >> On Feb 25, 2021, at 3:27 PM, David Anderson <da...@apache.org>
>>> wrote:
>>> >>
>>> >> 
>>> >> Rion,
>>> >>
>>> >> What you want isn't really achievable with the APIs you are using.
>>> >> Without some sort of per-key (per-tenant) watermarking -- which Flink
>>> >> doesn't offer -- the watermarks and windows for one tenant can be
>>> held
>>> >> up by the failure of another tenant's events to arrive in a timely
>>> manner.
>>> >>
>>> >> However, your pipeline is pretty straightforward, and it shouldn't be
>>> >> particularly difficult to accomplish what you want. What you can do
>>> is
>>> >> to ignore the built-in watermarking and windowing APIs, and build
>>> >> equivalent functionality in the form of a KeyedProcessFunction.
>>> >>
>>> >> The Flink docs include an example [1] showing how to implement your
>>> >> own tumbling event time windows with a process function. That
>>> >> implementation assumes you can rely on watermarks for triggering the
>>> >> windows; you'll have to do that differently.
>>> >>
>>> >> What you can do instead is to track, in ValueState, the largest
>>> >> timestamp you've seen so far (for each key/tenant). Whenever that
>>> >> advances, you can subtract the bounded-out-of-orderness duration from
>>> >> that timestamp, and then check to see if the resulting value is now
>>> >> large enough to trigger any of the windows for that key/tenant.
>>> >>
>>> >> Handling allowed lateness should be pretty straightforward.
>>> >>
>>> >> Hope this helps,
>>> >> David
>>> >>
>>> >> [1]
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
>>> >> <
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
>>> >
>>> >>
>>> >> On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmonster@gmail.com
>>> >> <ma...@gmail.com>> wrote:
>>> >>
>>> >>     Hey folks, I have a somewhat high-level/advice question regarding
>>> >>     Flink and if it has the mechanisms in place to accomplish what I’m
>>> >>     trying to do. I’ve spent a good bit of time using Apache Beam, but
>>> >>     recently pivoted over to native Flink simply because some of the
>>> >>     connectors weren’t as mature or didn’t support some of the
>>> >>     functionality that I needed.
>>> >>
>>> >>     Basically - I have a single Kafka topic with 10 partitions that
>>> >>     I’m consuming from. This is a multi-tenant topic containing data
>>> >>     that comes in at various times from various tenants and is not at
>>> >>     all guaranteed to be in order, at least with regards to “event
>>> >>     time”, which is what I care about.
>>> >>
>>> >>     What I’m trying to accomplish is this: *Given a multi-tenant topic
>>> >>     with records eventually distributed across partitions, is it
>>> >>     possible to consume and window each of these records independently
>>> >>     of one another without one tenant potentially influencing another
>>> >>     and write out to separate files per tenant/source (i.e. some other
>>> >>     defined property on the records)?”
>>> >>     *
>>> >>     My pipeline currently looks something like this:
>>> >>
>>> >>     @JvmStatic
>>> >>     fun main(args: Array<String>) {
>>> >>         val pipeline = StreamExecutionEnvironment
>>> >>             .getExecutionEnvironment()
>>> >>             //.createLocalEnvironmentWithWebUI(Configuration())
>>> >>
>>> >>         val properties = buildPropertiesFromArgs(args)
>>> >>         val stream = pipeline
>>> >>             .addSource(readFromKafka("events", properties))
>>> >>             .assignTimestampsAndWatermarks(
>>> >>                 WatermarkStrategy
>>> >>
>>> >>     .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...))
>>> >>                     .withTimestampAssigner { event: Event, _: Long ->
>>> >>                         // Assign the created timestamp as the event
>>> >>     timestamp
>>> >>                         Instant(event.createdTimestamp).millis
>>> >>                     }
>>> >>             )
>>> >>
>>> >>         // There are multiple data sources that each have their own
>>> >>     windows and allowed lateness
>>> >>         // so ensure that each source only handles records for it
>>> >>         DataSources.forEach { source ->
>>> >>             stream
>>> >>                 .filter { event ->
>>> >>                     event.source == source.name <http://source.name>
>>> >>                 }
>>> >>                 .keyBy { event ->
>>> >>                     //print("Keying record with id ${record.`id$1`} by
>>> >>     tenant ${record.`source$1`.tenantName}")
>>> >>                     event.tenant
>>> >>                 }
>>> >>                 .window(
>>> >>
>>> >>     TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
>>> >>                 )
>>> >>                 .allowedLateness(
>>> >>                     Time.minutes(source.allowedLateness)
>>> >>                 )
>>> >>                 .process(
>>> >>                     // This just contains some logic to take the
>>> >>     existing windows and construct a file
>>> >>                     // using the window range and keys (tenant/source)
>>> >>     with the values being
>>> >>                     // an aggregation of all of the records
>>> >>                     WindowedEventProcessFunction(source.name
>>> >>     <http://source.name>)
>>> >>                 )
>>> >>                 .map { summary ->
>>> >>                     // This would be a sink to write to a file
>>> >>                 }
>>> >>         }
>>> >>         pipeline.execute("event-processor")
>>> >>     }
>>> >>
>>> >>     My overarching question is really - *Can I properly separate the
>>> >>     data with custom watermark strategies and ensure that keying (or
>>> >>     some other construct) is enough to allow each tenant/source
>>> >>     combination to be treated as it’s own stream with it’s own
>>> >>     watermarking? *I know I could possibly break the single topic up
>>> >>     into multiple disparate topics, however that level of granularity
>>> >>     would likely result in several thousand (7000+) topics so I'm
>>> >>     hoping that some of the constructs available within Flink may help
>>> >>     with this (WatermarkStrategies, etc.)
>>> >>
>>> >>     Any recommendations / advice would be extremely helpful as I'm
>>> >>     quite new to the Flink world, however I have quite a bit of
>>> >>     experience in Apache Beam, Kafka Streams, and a smattering of
>>> >>     other streaming technologies.
>>> >>
>>> >>     Thanks much,
>>> >>
>>> >>     Rion
>>> >>
>>>
>>>

Re: Handling Data Separation / Watermarking from Kafka in Flink

Posted by David Anderson <da...@apache.org>.
When bounded Flink sources reach the end of their input, a special
watermark with the value Watermark.MAX_WATERMARK is emitted that will take
care of flushing all windows.

One approach is to use a DeserializationSchema or
KafkaDeserializationSchema with an implementation of isEndOfStream that
returns true when the end of the input stream has been reached; something
like this, perhaps:

public class TestDeserializer extends YourKafkaDeserializer<T> {
  public final static String END_APP_MARKER = "END_APP_MARKER"; // tests
send as last record

  @Override
  public boolean isEndOfStream(T nextElement) {
    if (END_APP_MARKER.equals(nextElement.getRawData()))
      return true;

    return false;
  }
}

Or with Flink 1.12, you could use the new KafkaSource with its setBounded
option.

Best,
David


On Mon, Mar 1, 2021 at 6:56 PM Rion Williams <ri...@gmail.com> wrote:

> Hey David et all,
>
> I had one follow up question for this as I've been putting together some
> integration/unit tests to verify that things are working as expected with
> finite datasets (e.g. a text file with several hundred records that are
> serialized, injected into Kafka, and processed through the pipeline). I'm
> wondering if there's a good strategy to handle these finite sets (i.e. when
> I'm done reading through all of the records that I care about, I'd need to
> trigger something to explicitly flush the windows / evict messages. I'm not
> sure what a great approach would be to handle here? I don't think there's
> an easy way to simulate processing time delays outside of an explicit
> Thread.sleep() call prior to injecting some messages into the running
> pipeline asynchronously.
>
> Any recommendations for handling something like this? I must imagine that
> it's a fairly common use-case for testing, but maybe not?
>
> Thanks much!
>
> Rion
>
> On Sat, Feb 27, 2021 at 10:56 AM Rion Williams <ri...@gmail.com>
> wrote:
>
>> Thanks David,
>>
>> I figured that the correct approach would obviously be to adopt a keying
>> strategy upstream to ensure the same data that I used as a key downstream
>> fell on the same partition (ensuring the ordering guarantees I’m looking
>> for).
>>
>> I’m guessing implementation-wise, when I would normally evict a window
>> after some event time and allowed lateness, I could set a timer or just
>> explicitly keep the window open for some additional time to allow for out
>> of order data to make its way into the window.
>>
>> Either way - I think the keying is probably the right approach, but I
>> wanted to consider any other options should that become an issue upstream.
>>
>> Thanks!
>>
>> Rion
>>
>> On Feb 27, 2021, at 10:21 AM, David Anderson <da...@apache.org>
>> wrote:
>>
>> 
>> Rion,
>>
>> If you can arrange for each tenant's events to be in only one kafka
>> partition, that should be the best way to simplify the processing you need
>> to do. Otherwise, a simple change that may help would be to increase the
>> bounded delay you use in calculating your own per-tenant watermarks,
>> thereby making late events less likely.
>>
>> David
>>
>> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams <ri...@gmail.com>
>> wrote:
>>
>>> David and Timo,
>>>
>>> Firstly, thank you both so much for your contributions and advice. I
>>> believe I’ve implemented things along the lines that you both detailed and
>>> things appear to work just as expected (e.g. I can see things arriving,
>>> being added to windows, discarding late records, and ultimately writing out
>>> files as expected).
>>>
>>> With that said, I have one question / issue that I’ve run into with
>>> handling the data coming my Kafka topic. Currently, my tenant/source (i.e.
>>> my key) may be distributed across the 10 partitions of my Kafka topic. With
>>> the way that I’m consuming from this topic (with a Kafka Consumer), it
>>> looks like my data is arriving in a mixed order which seems to be causing
>>> my own watermarks (those stored in my ValueState) to process as later data
>>> may arrive earlier than other data and cause my windows to be evicted.
>>>
>>> I’m currently using the `withNoWatermarks()` along with a custom
>>> timestamp assigned to handle all of my timestamping, but is there a
>>> mechanism to handle the mixed ordering across partitions in this scenario
>>> at the Flink level?
>>>
>>> I know the answer here likely lies with Kafka and adopting a better
>>> keying strategy to ensure the same tenant/source (my key) lands on the same
>>> partition, which by definition ensures ordering. I’m just wondering if
>>> there’s some mechanism to accomplish this post-reading from Kafka in Flink
>>> within my pipeline to handle things in a similar fashion?
>>>
>>> Again - thank you both so much, I’m loving the granularity and control
>>> that Flink has been providing me over other streaming technologies I’ve
>>> used in the past. I’m totally sold on it and am looking forward to doing
>>> more incredible things with it.
>>>
>>> Best regards,
>>>
>>> Rion
>>>
>>> On Feb 26, 2021, at 4:36 AM, David Anderson <da...@apache.org>
>>> wrote:
>>>
>>> 
>>> Yes indeed, Timo is correct -- I am proposing that you not use timers at
>>> all. Watermarks and event-time timers go hand in hand -- and neither
>>> mechanism can satisfy your requirements.
>>>
>>> You can instead put all of the timing logic in the processElement method
>>> -- effectively emulating what you would get if Flink were to offer per-key
>>> watermarking.
>>>
>>> The reason that the PseudoWindow example is using MapState is that for
>>> each key/tenant, more than one window can be active simultaneously. This
>>> occurs because the event stream is out-of-order with respect to time, so
>>> events for the "next window" are probably being processed before "the
>>> previous" window is complete. And if you want to accommodate allowed
>>> lateness, the requirement to have several windows open at once becomes even
>>> more important.
>>>
>>> MapState gives you a per-tenant hashmap, where each entry in that map
>>> corresponds to an open window for some particular tenant, where the map's
>>> key is the timestamp for a window, and the value is whatever state you want
>>> that window to hold.
>>>
>>> Best regards,
>>> David
>>>
>>>
>>>
>>>
>>> On Fri, Feb 26, 2021 at 9:44 AM Timo Walther <tw...@apache.org> wrote:
>>>
>>>> Hi Rion,
>>>>
>>>> I think what David was refering to is that you do the entire time
>>>> handling yourself in process function. That means not using the
>>>> `context.timerService()` or `onTimer()` that Flink provides but calling
>>>> your own logic based on the timestamps that enter your process function
>>>> and the stored state.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 26.02.21 00:29, Rion Williams wrote:
>>>> > 
>>>> > Hi David,
>>>> >
>>>> > Thanks for your prompt reply, it was very helpful and the
>>>> PseudoWindow
>>>> > example is excellent. I believe it closely aligns with an approach
>>>> that
>>>> > I was tinkering with but seemed to be missing a few key pieces. In my
>>>> > case, I'm essentially going to want to be aggregating the messages
>>>> that
>>>> > are coming into the window (a simple string-concatenation aggregation
>>>> > would work). Would I need another form of state to hold that, as
>>>> looking
>>>> > through this example with naive eyes, it seems that this function is
>>>> > currently storing multiple windows in state via the MapState provided:
>>>> >
>>>> > // Keyed, managed state, with an entry for each window, keyed by the
>>>> > window's end time.
>>>> > // There is a separate MapState object for each driver.
>>>> > private transient MapState<Long, Float> sumOfTips;
>>>> >
>>>> > If I wanted to perform an aggregation for each key/tenant, would a
>>>> > MapState be appropriate? Such as a MapState<Long, String> if I was
>>>> doing
>>>> > a string aggregation, so that within my processElement function I
>>>> could
>>>> > use something similar for building these aggregations and ultimately
>>>> > triggering them:
>>>> >
>>>> > // Keep track of a tenant/source specific watermark
>>>> > private lateinit var currentWatermark: ValueState<Long>
>>>> > // Keep track of the contents of each of the windows where the key
>>>> > represents the close
>>>> > // of the window and the contents represents an accumulation of the
>>>> > records for that window
>>>> > private lateinit var windowContents: MapState<Long, String>
>>>> >
>>>> > If that's the case, this is what I've thrown together thus far and I
>>>> > feel like it's moving in the right direction:
>>>> >
>>>> > class MagicWindow(private val duration: Long, private val lateness:
>>>> Long):
>>>> >      KeyedProcessFunction<String, Event, FileOutput>(){
>>>> >
>>>> >      // Keep track of a tenant/source specific watermark
>>>> >      private lateinit var currentWatermark: ValueState<Long>
>>>> >      // Keep track of the contents of each of the windows where the
>>>> key
>>>> > represents the close
>>>> >      // of the window and the contents represents an accumulation of
>>>> the
>>>> > records for that window
>>>> >      private lateinit var windowContents: MapState<Long, String>
>>>> >
>>>> >      override fun open(config: Configuration) {
>>>> >          currentWatermark = runtimeContext.getState(watermark)
>>>> >          currentWatermark.update(Long.MIN_VALUE)
>>>> >      }
>>>> >
>>>> >      override fun processElement(element: Event, context: Context,
>>>> out:
>>>> > Collector<FileOutput>) {
>>>> >          // Resolve the event time
>>>> >          val eventTime: Long = getEventTime(element)
>>>> >
>>>> >          // Update watermark (if applicable)
>>>> >          if (currentWatermark.value() < eventTime){
>>>> >              currentWatermark.update(eventTime)
>>>> >          }
>>>> >
>>>> >          // Define a timer for this window
>>>> >          val timerService = context.timerService()
>>>> >
>>>> >          if (eventTime <= timerService.currentWatermark()) {
>>>> >              // This event is late; its window has already been
>>>> triggered.
>>>> >          } else {
>>>> >              // Determine the "actual" window closure and start a
>>>> timer
>>>> > for it
>>>> >              // (eventTime + window
>>>> >              val endOfWindow= eventTime - (eventTime % duration) +
>>>> > duration - 1
>>>> >
>>>> >              // Schedule a callback for when the window has been
>>>> completed.
>>>> >              timerService.registerEventTimeTimer(endOfWindow)
>>>> >
>>>> >              // Add this element to the corresponding aggregation for
>>>> > this window
>>>> >              windowContents.put(endOfWindow,
>>>> windowContents[endOfWindow]
>>>> > + "$element")
>>>> >          }
>>>> >      }
>>>> >
>>>> >      override fun onTimer(timestamp: Long, context: OnTimerContext,
>>>> out:
>>>> > Collector<FileOutput>) {
>>>> >          val key = context.currentKey
>>>> >          val currentAggregation: String =
>>>> windowContents.get(timestamp)
>>>> >
>>>> >          // Output things here and clear the current aggregation for
>>>> this
>>>> >          // tenant/source combination in this window
>>>> >      }
>>>> >
>>>> >      companion object {
>>>> >          private val watermark = ValueStateDescriptor(
>>>> >              "watermark",
>>>> >              Long::class.java
>>>> >          )
>>>> >
>>>> >          private val windowContents = MapStateDescriptor(
>>>> >              "window-contents",
>>>> >              Long::class.java,
>>>> >              String::class.java
>>>> >          )
>>>> >
>>>> >          fun getEventTime(element: Event): Long {
>>>> >              return
>>>> Instant(element.`source$1`.createdTimestamp).millis
>>>> >          }
>>>> >      }
>>>> > }
>>>> >
>>>> > Is something glaringly off with this? I’ll need to do some
>>>> additionally
>>>> > reading on the timers, but any additional clarification would be
>>>> greatly
>>>> > appreciated.
>>>> >
>>>> > Thanks so much for your initial response again!
>>>> >
>>>> > Rion
>>>> >
>>>> >> On Feb 25, 2021, at 3:27 PM, David Anderson <da...@apache.org>
>>>> wrote:
>>>> >>
>>>> >> 
>>>> >> Rion,
>>>> >>
>>>> >> What you want isn't really achievable with the APIs you are using.
>>>> >> Without some sort of per-key (per-tenant) watermarking -- which
>>>> Flink
>>>> >> doesn't offer -- the watermarks and windows for one tenant can be
>>>> held
>>>> >> up by the failure of another tenant's events to arrive in a timely
>>>> manner.
>>>> >>
>>>> >> However, your pipeline is pretty straightforward, and it shouldn't
>>>> be
>>>> >> particularly difficult to accomplish what you want. What you can do
>>>> is
>>>> >> to ignore the built-in watermarking and windowing APIs, and build
>>>> >> equivalent functionality in the form of a KeyedProcessFunction.
>>>> >>
>>>> >> The Flink docs include an example [1] showing how to implement your
>>>> >> own tumbling event time windows with a process function. That
>>>> >> implementation assumes you can rely on watermarks for triggering the
>>>> >> windows; you'll have to do that differently.
>>>> >>
>>>> >> What you can do instead is to track, in ValueState, the largest
>>>> >> timestamp you've seen so far (for each key/tenant). Whenever that
>>>> >> advances, you can subtract the bounded-out-of-orderness duration
>>>> from
>>>> >> that timestamp, and then check to see if the resulting value is now
>>>> >> large enough to trigger any of the windows for that key/tenant.
>>>> >>
>>>> >> Handling allowed lateness should be pretty straightforward.
>>>> >>
>>>> >> Hope this helps,
>>>> >> David
>>>> >>
>>>> >> [1]
>>>> >>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
>>>> >> <
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
>>>> >
>>>> >>
>>>> >> On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmonster@gmail.com
>>>> >> <ma...@gmail.com>> wrote:
>>>> >>
>>>> >>     Hey folks, I have a somewhat high-level/advice question regarding
>>>> >>     Flink and if it has the mechanisms in place to accomplish what
>>>> I’m
>>>> >>     trying to do. I’ve spent a good bit of time using Apache Beam,
>>>> but
>>>> >>     recently pivoted over to native Flink simply because some of the
>>>> >>     connectors weren’t as mature or didn’t support some of the
>>>> >>     functionality that I needed.
>>>> >>
>>>> >>     Basically - I have a single Kafka topic with 10 partitions that
>>>> >>     I’m consuming from. This is a multi-tenant topic containing data
>>>> >>     that comes in at various times from various tenants and is not at
>>>> >>     all guaranteed to be in order, at least with regards to “event
>>>> >>     time”, which is what I care about.
>>>> >>
>>>> >>     What I’m trying to accomplish is this: *Given a multi-tenant
>>>> topic
>>>> >>     with records eventually distributed across partitions, is it
>>>> >>     possible to consume and window each of these records
>>>> independently
>>>> >>     of one another without one tenant potentially influencing another
>>>> >>     and write out to separate files per tenant/source (i.e. some
>>>> other
>>>> >>     defined property on the records)?”
>>>> >>     *
>>>> >>     My pipeline currently looks something like this:
>>>> >>
>>>> >>     @JvmStatic
>>>> >>     fun main(args: Array<String>) {
>>>> >>         val pipeline = StreamExecutionEnvironment
>>>> >>             .getExecutionEnvironment()
>>>> >>             //.createLocalEnvironmentWithWebUI(Configuration())
>>>> >>
>>>> >>         val properties = buildPropertiesFromArgs(args)
>>>> >>         val stream = pipeline
>>>> >>             .addSource(readFromKafka("events", properties))
>>>> >>             .assignTimestampsAndWatermarks(
>>>> >>                 WatermarkStrategy
>>>> >>
>>>> >>     .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...))
>>>> >>                     .withTimestampAssigner { event: Event, _: Long ->
>>>> >>                         // Assign the created timestamp as the event
>>>> >>     timestamp
>>>> >>                         Instant(event.createdTimestamp).millis
>>>> >>                     }
>>>> >>             )
>>>> >>
>>>> >>         // There are multiple data sources that each have their own
>>>> >>     windows and allowed lateness
>>>> >>         // so ensure that each source only handles records for it
>>>> >>         DataSources.forEach { source ->
>>>> >>             stream
>>>> >>                 .filter { event ->
>>>> >>                     event.source == source.name <http://source.name>
>>>> >>                 }
>>>> >>                 .keyBy { event ->
>>>> >>                     //print("Keying record with id ${record.`id$1`}
>>>> by
>>>> >>     tenant ${record.`source$1`.tenantName}")
>>>> >>                     event.tenant
>>>> >>                 }
>>>> >>                 .window(
>>>> >>
>>>> >>     TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
>>>> >>                 )
>>>> >>                 .allowedLateness(
>>>> >>                     Time.minutes(source.allowedLateness)
>>>> >>                 )
>>>> >>                 .process(
>>>> >>                     // This just contains some logic to take the
>>>> >>     existing windows and construct a file
>>>> >>                     // using the window range and keys
>>>> (tenant/source)
>>>> >>     with the values being
>>>> >>                     // an aggregation of all of the records
>>>> >>                     WindowedEventProcessFunction(source.name
>>>> >>     <http://source.name>)
>>>> >>                 )
>>>> >>                 .map { summary ->
>>>> >>                     // This would be a sink to write to a file
>>>> >>                 }
>>>> >>         }
>>>> >>         pipeline.execute("event-processor")
>>>> >>     }
>>>> >>
>>>> >>     My overarching question is really - *Can I properly separate the
>>>> >>     data with custom watermark strategies and ensure that keying (or
>>>> >>     some other construct) is enough to allow each tenant/source
>>>> >>     combination to be treated as it’s own stream with it’s own
>>>> >>     watermarking? *I know I could possibly break the single topic up
>>>> >>     into multiple disparate topics, however that level of granularity
>>>> >>     would likely result in several thousand (7000+) topics so I'm
>>>> >>     hoping that some of the constructs available within Flink may
>>>> help
>>>> >>     with this (WatermarkStrategies, etc.)
>>>> >>
>>>> >>     Any recommendations / advice would be extremely helpful as I'm
>>>> >>     quite new to the Flink world, however I have quite a bit of
>>>> >>     experience in Apache Beam, Kafka Streams, and a smattering of
>>>> >>     other streaming technologies.
>>>> >>
>>>> >>     Thanks much,
>>>> >>
>>>> >>     Rion
>>>> >>
>>>>
>>>>