You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Rion Williams <ri...@gmail.com> on 2021/01/29 15:27:41 UTC

Handling Out-of-Order Windowing Event Times from Kafka to GCS

Hey folks,


I’ve been mulling over how to solve a given problem in Beam and thought I’d
reach out to a larger audience for some advice. At present things seem to
be working sparsely and I was curious if someone could provide a
sounding-board to see if this workflow makes sense.

The primary high-level goal is to read records from Kafka that may be out
of order and need to be windowed in Event Time according to another
property found on the records and eventually emitting the contents of those
windows and writing them out to GCS.


The current pipeline looks roughly like the following:

val partitionedEvents = pipeline
    .apply("Read Events from Kafka",
        KafkaIO
            .read<String, Log>()
            .withBootstrapServers(options.brokerUrl)
            .withTopic(options.incomingEventsTopic)
            .withKeyDeserializer(StringDeserializer::class.java)
            .withValueDeserializerAndCoder(
                SpecificAvroDeserializer<Log>()::class.java,
                AvroCoder.of(Log::class.java)
            )
            .withReadCommitted()
            .commitOffsetsInFinalize()
            // Set the watermark to use a specific field for event time
            .withTimestampPolicyFactory { _, previousWatermark ->
WatermarkPolicy(previousWatermark) }
            .withConsumerConfigUpdates(
            ImmutableMap.of<String, Any?>(
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
                ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
                "schema.registry.url", options.schemaRegistryUrl
            )
        ).withoutMetadata()
    )
    .apply("Logging Incoming Logs", ParDo.of(Events.log()))
    .apply("Rekey Logs by Tenant", ParDo.of(Events.key()))
    .apply("Partition Logs by Source",
        // This is a custom function that will partition incoming
records by a specific
        // datasource field
        Partition.of(dataSources.size, Events.partition<KV<String,
Log>>(dataSources))
    )

dataSources.forEach { dataSource ->
    // Store a reference to the data source name to avoid serialization issues
    val sourceName = dataSource.name
    val tempDirectory = Directories.resolveTemporaryDirectory(options.output)

    // Grab all of the events for this specific partition and apply
the source-specific windowing
    // strategies
    partitionedEvents[dataSource.partition]
        .apply(
            "Building Windows for $sourceName",
            SourceSpecificWindow.of<KV<String, Log>>(dataSource)
        )
        .apply("Group Windowed Logs by Key for $sourceName",
GroupByKey.create())
        .apply("Log Events After Windowing for $sourceName",
ParDo.of(Events.logAfterWindowing()))
        .apply(
            "Writing Windowed Logs to Files for $sourceName",
            FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
                .withNumShards(1)
                .by { row -> "${row.key}/${sourceName}" }
                .withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.fn(SerializableFunction { logs ->
Files.stringify(logs.value) }), TextIO.sink())
                .to(options.output)
                .withNaming { partition -> Files.name(partition)}
                .withTempDirectory(tempDirectory)
        )
}

In a simpler, bulleted form, it might look like this:

   - Read records from single Kafka topic
   - Key all records by their tenant
   - Partition stream by another event properly
   - Iterate through known partitions in previous step
   - Apply custom windowing rules for each partition
   - Group windowed items by key
   - Write tenant-key pair groupings to GCP via FileIO

The problem is that the incoming Kafka topic contains out-of-order data
across multiple tenants (e.g. events for tenant1 might be streaming in now,
but then a few minutes later you’ll get them for tenant2 in the same
partition, etc.).
*This would cause the watermark to bounce back and forth in time as each
incoming record would not be guaranteed to continually increase, which
sounds like it would be a problem, but I'm not certain. It certainly seems
that while data is flowing through, some files are simply not being emitted
at all.*


The custom windowing function is extremely simple and was aimed to emit a
single window once the allowed lateness and windowing duration has elapsed:

object SourceSpecificWindow {
    fun <T> of(dataSource: DataSource): Window<T> {
        return Window.into<T>(FixedWindows.of(dataSource.windowDuration()))
            .triggering(Never.ever())
            .withAllowedLateness(dataSource.allowedLateness(),
Window.ClosingBehavior.FIRE_ALWAYS)
            .discardingFiredPanes()
    }
}

However, it seemed inconsistent since we'd see logging come out after the
closing of the window, but not necessarily files being written out to GCS.

*Does anything seem blatantly wrong or incorrect with this approach? Since
the data can come in out of order within the source (i.e. right now, 2
hours ago, 5 minutes from now) and covers data across multiple tenants, but
the aim is try and ensure that one tenant that keeps up to date won't drown
out tenants that might come in the past.*

Would we potentially need another Beam application or something to "split"
this single stream of events into sub-streams that are each processed
independently (so that each watermark processes on their own)

Any advice would be greatly appreciated or even just another set of eyes.
I'd be happy to provide any additional details that I could.

Thanks much,

Rion