You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alex Cruise <al...@cluonflux.com> on 2021/04/07 20:39:10 UTC

Async + Broadcast?

Hi folks,

I have a somewhat complex Flink job that has a few async stages, and a few
stateful stages. It currently loads its configuration on startup, and
doesn't attempt to refresh it.

Now I'm working on dynamic reconfiguration. I've written a polling source
which sends a configuration snapshot whenever anything has changed, I've
set up a broadcast of that source, and I'm updating the operators in the
data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've
reached the first async operator, and I recall that async functions aren't
allowed to be stateful.

I've tried to find a best practice for this situation, without much luck.
My best idea so far is to insert a new stage before the async one, which
would tuple up each record with its corresponding config snapshot from the
most recent broadcast state. This would increase the amount of data that
needs to be serialized, and some of the configs are quite large, but would
allow me to continue using async IO.

Any suggestions?

Thanks!

-0xe1a

Re: Async + Broadcast?

Posted by Arvid Heise <ar...@apache.org>.
Hi Alex,

The easiest way to verify if what you tried is working out is to look at
Flink's Web UI and check the topology.

The broadcast side of the input will always be ... well broadcasted (=not
chained). So you need to disable chaining only on the non-broadcasted
dataset.

val parsed: DataStream<Record> = dataSource
  .disableChaining()
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

Regarding objectReuse, it's safe to enable if you don't do any dirty hacks
on data that has been output already. So what you cannot do is, store the
last element in your map function (without managed state) and use that to
calculate the new result.

On Fri, Apr 9, 2021 at 1:13 AM Alex Cruise <al...@cluonflux.com> wrote:

> Thanks Arvid! I'm not completely clear on where to apply your suggestions.
>
> I've included a sketch of my job below, and I have a couple questions:
>
> 1. It looks like enableObjectReuse() is a global setting, should I worry
> about whether I'm using any mutable data between stages?
> 2. Should I disableChaining() on BOTH broadcast-dependent stages, or just
> the one immediately preceding the async?
>
> Thanks!
>
> -0xe1a
>
> *Types:*
>
> /** all the configs for a given tenant, as of the time when a change was
> observed */
> data class ConfigSnapshot(
>   tenantId: Long,
>   timestamp: Instant,
>   configs: Map<UUID, Config>
> )
>
> /** parse raw strings from input, rejecting those for unconfigured tenants
> */
> class Parse(
>   initialConfigs: Map<Long, ConfigSnapshot>
> ) : BroadcastProcessFunction<String, ConfigSnapshot, Record> {
>   override fun processBroadcastElement(
>     value: ConfigSnapshot,
>     ctx: Context,
>     out: Collector<Record>
>   ) {
>     val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
>     snapshots.put(value.tenantId, value)
>   }
>
>   override fun processElement(value: String, ctx: ReadOnlyContext, out:
> Collector<Record>) {
>     val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
>       .toMap()
>       .keys
>       .ifEmpty { initialConfigs.keys }
>
>     val parsed = Record(value)
>     if (!validTenantIds.contains(parsed.tenantId)) {
>       return
>     } else {
>       out.collect(parsed)
>     }
>   }
> }
>
> /** given a parsed record, identity which config(s) are interested in it,
> and send an output value of the record tupled with the interested config */
> class ValidateAndDistribute(
>   initialConfigs: Map<Long, ConfigSnapshot>
> ) : BroadcastProcessFunction<Record, ConfigSnapshot, Pair<Record, Config>>
> {
>   override fun processBroadcastElement(
>     value: ConfigSnapshot,
>     ctx: Context,
>     out: Collector<Pair<Record, Config>>
>   ) {
>     val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
>     snapshots.put(value.tenantId, value)
>   }
>
>   override fun processElement(
>     value: Record,
>     ctx: ReadOnlyContext,
>     out: Collector<Pair<Record, Config>>
>   ) {
>     val configsForThisTenant =
> ctx.getBroadcastState(configSnapshotDescriptor)
>       .toMap()
>       .ifEmpty { initialConfigs }
>       .get(value.tenantId)
>       .configs
>       .orEmpty()
>
>     val configsInterestedInThisRecord = configsForThisTenant.values.filter
> {
>       it.interestedIn(value)
>     }
>
>     for ((configId, config) in configsInterestedInThisRecord) {
>       out.collect(value to config)
>     }
>   }
> }
>
> /** given a pair of Record and Config, run the async operation and send an
> enriched record including the result */
> class Enrich() : RichAsyncFunction<Pair<Record, Config>, EnrichedRecord>
>
> *Job Pseudocode:*
>
> val initialConfigs: Map<Long, ConfigSnapshot> = ???
> val dataSource: DataStream<String> = ???
> val configSource: DataStream<ConfigSnapshot> = ??? // from a legacy "while
> (true) { poll; sleep }" source
>
> // the config-subscribing operators keep the broadcast state in a
> Map<tenantId: Long, ConfigSnapshot>
> val configSnapshotDescriptor = MapStateDescriptor(
>   "currentConfigSnapshots",
>   Long::class.java,
>   ConfigSnapshot::class.java
> )
>
> // Broadcast the snapshots
> val configBroadcast: BroadcastStream<ConfigSnapshot> =
> configSource.broadcast(configSnapshotDescriptor)
>
> val parsed: DataStream<Record> = dataSource
>   .connect(configBroadcast)
>   .process(Parse(initialConfigs))
>
> // input records can be duplicated now, as there may be multiple Configs
> that are interested in a record
> val validated: DataStream<Pair<Record, Config>> = parsed
>   .connect(configBroadcast)
>   .process(ValidateAndDistribute(initialConfigs))
>
> val enriched: DataStream<EnrichedRecord> = AsyncDataStream.unorderedWait(
>   validated,
>   Enrich(),
>   5L,
>   TimeUnit.SECONDS
> )
>
>
>
>
>
> On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Alex,
>>
>> your approach is completely valid. What you want to achieve is that you
>> have a chain between your state managing operator and the consuming async
>> operations. In that way, you have no serialization overhead.
>>
>> To achieve that you want to
>> - use Flink 1.11+ [1]
>> - make sure that if you have a legacy source, you disableChaining before
>> your state managing operator as asyncIO cannot be (transitively) chained to
>> legacy sources. So it should be source -> ... -> (forward channel) ->
>> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
>> - enableObjectReuse [2] to avoid copying of objects
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16219
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html
>>
>> On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <al...@cluonflux.com> wrote:
>>
>>> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config
>>> :)
>>>
>>> -0xe1a
>>>
>>> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
>>> austin.cawley@gmail.com> wrote:
>>>
>>>> Hey Alex,
>>>>
>>>> I'm not sure if there is a best practice here, but what I can tell you
>>>> is that I worked on a job that did exactly what you're suggesting with a
>>>> non-async operator to create a [record, config] tuple, which was then
>>>> passed to the async stage. Our config objects were also not tiny (~500kb)
>>>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
>>>> setup worked quite well. I'd say if latency isn't your most important
>>>> metric, or if your pipeline is a similar size, the ease of async IO is
>>>> worth it.
>>>>
>>>> One thing you'll have to look out for (if you haven't already) is
>>>> bootstrapping the config objects when the job starts, since the broadcast
>>>> from the polling source can happen later than recieving the first record –
>>>> we solved this by calling the polling source's service in the `open()`
>>>> method of the non-async operator and storing the initial configs in memory.
>>>>
>>>> Hope that helps a bit,
>>>> Austin
>>>>
>>>> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <al...@cluonflux.com> wrote:
>>>>
>>>>> Hi folks,
>>>>>
>>>>> I have a somewhat complex Flink job that has a few async stages, and a
>>>>> few stateful stages. It currently loads its configuration on startup, and
>>>>> doesn't attempt to refresh it.
>>>>>
>>>>> Now I'm working on dynamic reconfiguration. I've written a polling
>>>>> source which sends a configuration snapshot whenever anything has changed,
>>>>> I've set up a broadcast of that source, and I'm updating the operators in
>>>>> the data (i.e. not config) stream to be BroadcastProcessFunctions. But now
>>>>> I've reached the first async operator, and I recall that async functions
>>>>> aren't allowed to be stateful.
>>>>>
>>>>> I've tried to find a best practice for this situation, without much
>>>>> luck. My best idea so far is to insert a new stage before the async one,
>>>>> which would tuple up each record with its corresponding config snapshot
>>>>> from the most recent broadcast state. This would increase the amount of
>>>>> data that needs to be serialized, and some of the configs are quite large,
>>>>> but would allow me to continue using async IO.
>>>>>
>>>>> Any suggestions?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> -0xe1a
>>>>>
>>>>

Re: Async + Broadcast?

Posted by Alex Cruise <al...@cluonflux.com>.
Thanks Arvid! I'm not completely clear on where to apply your suggestions.

I've included a sketch of my job below, and I have a couple questions:

1. It looks like enableObjectReuse() is a global setting, should I worry
about whether I'm using any mutable data between stages?
2. Should I disableChaining() on BOTH broadcast-dependent stages, or just
the one immediately preceding the async?

Thanks!

-0xe1a

*Types:*

/** all the configs for a given tenant, as of the time when a change was
observed */
data class ConfigSnapshot(
  tenantId: Long,
  timestamp: Instant,
  configs: Map<UUID, Config>
)

/** parse raw strings from input, rejecting those for unconfigured tenants
*/
class Parse(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<String, ConfigSnapshot, Record> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Record>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)
  }

  override fun processElement(value: String, ctx: ReadOnlyContext, out:
Collector<Record>) {
    val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
      .toMap()
      .keys
      .ifEmpty { initialConfigs.keys }

    val parsed = Record(value)
    if (!validTenantIds.contains(parsed.tenantId)) {
      return
    } else {
      out.collect(parsed)
    }
  }
}

/** given a parsed record, identity which config(s) are interested in it,
and send an output value of the record tupled with the interested config */
class ValidateAndDistribute(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<Record, ConfigSnapshot, Pair<Record, Config>> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Pair<Record, Config>>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)
  }

  override fun processElement(
    value: Record,
    ctx: ReadOnlyContext,
    out: Collector<Pair<Record, Config>>
  ) {
    val configsForThisTenant =
ctx.getBroadcastState(configSnapshotDescriptor)
      .toMap()
      .ifEmpty { initialConfigs }
      .get(value.tenantId)
      .configs
      .orEmpty()

    val configsInterestedInThisRecord = configsForThisTenant.values.filter
{
      it.interestedIn(value)
    }

    for ((configId, config) in configsInterestedInThisRecord) {
      out.collect(value to config)
    }
  }
}

/** given a pair of Record and Config, run the async operation and send an
enriched record including the result */
class Enrich() : RichAsyncFunction<Pair<Record, Config>, EnrichedRecord>

*Job Pseudocode:*

val initialConfigs: Map<Long, ConfigSnapshot> = ???
val dataSource: DataStream<String> = ???
val configSource: DataStream<ConfigSnapshot> = ??? // from a legacy "while
(true) { poll; sleep }" source

// the config-subscribing operators keep the broadcast state in a
Map<tenantId: Long, ConfigSnapshot>
val configSnapshotDescriptor = MapStateDescriptor(
  "currentConfigSnapshots",
  Long::class.java,
  ConfigSnapshot::class.java
)

// Broadcast the snapshots
val configBroadcast: BroadcastStream<ConfigSnapshot> =
configSource.broadcast(configSnapshotDescriptor)

val parsed: DataStream<Record> = dataSource
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

// input records can be duplicated now, as there may be multiple Configs
that are interested in a record
val validated: DataStream<Pair<Record, Config>> = parsed
  .connect(configBroadcast)
  .process(ValidateAndDistribute(initialConfigs))

val enriched: DataStream<EnrichedRecord> = AsyncDataStream.unorderedWait(
  validated,
  Enrich(),
  5L,
  TimeUnit.SECONDS
)





On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Alex,
>
> your approach is completely valid. What you want to achieve is that you
> have a chain between your state managing operator and the consuming async
> operations. In that way, you have no serialization overhead.
>
> To achieve that you want to
> - use Flink 1.11+ [1]
> - make sure that if you have a legacy source, you disableChaining before
> your state managing operator as asyncIO cannot be (transitively) chained to
> legacy sources. So it should be source -> ... -> (forward channel) ->
> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
> - enableObjectReuse [2] to avoid copying of objects
>
> [1] https://issues.apache.org/jira/browse/FLINK-16219
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html
>
> On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <al...@cluonflux.com> wrote:
>
>> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)
>>
>> -0xe1a
>>
>> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
>> austin.cawley@gmail.com> wrote:
>>
>>> Hey Alex,
>>>
>>> I'm not sure if there is a best practice here, but what I can tell you
>>> is that I worked on a job that did exactly what you're suggesting with a
>>> non-async operator to create a [record, config] tuple, which was then
>>> passed to the async stage. Our config objects were also not tiny (~500kb)
>>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
>>> setup worked quite well. I'd say if latency isn't your most important
>>> metric, or if your pipeline is a similar size, the ease of async IO is
>>> worth it.
>>>
>>> One thing you'll have to look out for (if you haven't already) is
>>> bootstrapping the config objects when the job starts, since the broadcast
>>> from the polling source can happen later than recieving the first record –
>>> we solved this by calling the polling source's service in the `open()`
>>> method of the non-async operator and storing the initial configs in memory.
>>>
>>> Hope that helps a bit,
>>> Austin
>>>
>>> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <al...@cluonflux.com> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> I have a somewhat complex Flink job that has a few async stages, and a
>>>> few stateful stages. It currently loads its configuration on startup, and
>>>> doesn't attempt to refresh it.
>>>>
>>>> Now I'm working on dynamic reconfiguration. I've written a polling
>>>> source which sends a configuration snapshot whenever anything has changed,
>>>> I've set up a broadcast of that source, and I'm updating the operators in
>>>> the data (i.e. not config) stream to be BroadcastProcessFunctions. But now
>>>> I've reached the first async operator, and I recall that async functions
>>>> aren't allowed to be stateful.
>>>>
>>>> I've tried to find a best practice for this situation, without much
>>>> luck. My best idea so far is to insert a new stage before the async one,
>>>> which would tuple up each record with its corresponding config snapshot
>>>> from the most recent broadcast state. This would increase the amount of
>>>> data that needs to be serialized, and some of the configs are quite large,
>>>> but would allow me to continue using async IO.
>>>>
>>>> Any suggestions?
>>>>
>>>> Thanks!
>>>>
>>>> -0xe1a
>>>>
>>>

Re: Async + Broadcast?

Posted by Arvid Heise <ar...@apache.org>.
Hi Alex,

your approach is completely valid. What you want to achieve is that you
have a chain between your state managing operator and the consuming async
operations. In that way, you have no serialization overhead.

To achieve that you want to
- use Flink 1.11+ [1]
- make sure that if you have a legacy source, you disableChaining before
your state managing operator as asyncIO cannot be (transitively) chained to
legacy sources. So it should be source -> ... -> (forward channel) ->
(state managing operator -> async1 -> async2 -> ... ) ... -> sink
- enableObjectReuse [2] to avoid copying of objects

[1] https://issues.apache.org/jira/browse/FLINK-16219
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html

On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <al...@cluonflux.com> wrote:

> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)
>
> -0xe1a
>
> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
> austin.cawley@gmail.com> wrote:
>
>> Hey Alex,
>>
>> I'm not sure if there is a best practice here, but what I can tell you is
>> that I worked on a job that did exactly what you're suggesting with a
>> non-async operator to create a [record, config] tuple, which was then
>> passed to the async stage. Our config objects were also not tiny (~500kb)
>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
>> setup worked quite well. I'd say if latency isn't your most important
>> metric, or if your pipeline is a similar size, the ease of async IO is
>> worth it.
>>
>> One thing you'll have to look out for (if you haven't already) is
>> bootstrapping the config objects when the job starts, since the broadcast
>> from the polling source can happen later than recieving the first record –
>> we solved this by calling the polling source's service in the `open()`
>> method of the non-async operator and storing the initial configs in memory.
>>
>> Hope that helps a bit,
>> Austin
>>
>> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <al...@cluonflux.com> wrote:
>>
>>> Hi folks,
>>>
>>> I have a somewhat complex Flink job that has a few async stages, and a
>>> few stateful stages. It currently loads its configuration on startup, and
>>> doesn't attempt to refresh it.
>>>
>>> Now I'm working on dynamic reconfiguration. I've written a polling
>>> source which sends a configuration snapshot whenever anything has changed,
>>> I've set up a broadcast of that source, and I'm updating the operators in
>>> the data (i.e. not config) stream to be BroadcastProcessFunctions. But now
>>> I've reached the first async operator, and I recall that async functions
>>> aren't allowed to be stateful.
>>>
>>> I've tried to find a best practice for this situation, without much
>>> luck. My best idea so far is to insert a new stage before the async one,
>>> which would tuple up each record with its corresponding config snapshot
>>> from the most recent broadcast state. This would increase the amount of
>>> data that needs to be serialized, and some of the configs are quite large,
>>> but would allow me to continue using async IO.
>>>
>>> Any suggestions?
>>>
>>> Thanks!
>>>
>>> -0xe1a
>>>
>>

Re: Async + Broadcast?

Posted by Alex Cruise <al...@cluonflux.com>.
Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)

-0xe1a

On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
austin.cawley@gmail.com> wrote:

> Hey Alex,
>
> I'm not sure if there is a best practice here, but what I can tell you is
> that I worked on a job that did exactly what you're suggesting with a
> non-async operator to create a [record, config] tuple, which was then
> passed to the async stage. Our config objects were also not tiny (~500kb)
> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
> setup worked quite well. I'd say if latency isn't your most important
> metric, or if your pipeline is a similar size, the ease of async IO is
> worth it.
>
> One thing you'll have to look out for (if you haven't already) is
> bootstrapping the config objects when the job starts, since the broadcast
> from the polling source can happen later than recieving the first record –
> we solved this by calling the polling source's service in the `open()`
> method of the non-async operator and storing the initial configs in memory.
>
> Hope that helps a bit,
> Austin
>
> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <al...@cluonflux.com> wrote:
>
>> Hi folks,
>>
>> I have a somewhat complex Flink job that has a few async stages, and a
>> few stateful stages. It currently loads its configuration on startup, and
>> doesn't attempt to refresh it.
>>
>> Now I'm working on dynamic reconfiguration. I've written a polling source
>> which sends a configuration snapshot whenever anything has changed, I've
>> set up a broadcast of that source, and I'm updating the operators in the
>> data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've
>> reached the first async operator, and I recall that async functions aren't
>> allowed to be stateful.
>>
>> I've tried to find a best practice for this situation, without much luck.
>> My best idea so far is to insert a new stage before the async one, which
>> would tuple up each record with its corresponding config snapshot from the
>> most recent broadcast state. This would increase the amount of data that
>> needs to be serialized, and some of the configs are quite large, but would
>> allow me to continue using async IO.
>>
>> Any suggestions?
>>
>> Thanks!
>>
>> -0xe1a
>>
>

Re: Async + Broadcast?

Posted by Austin Cawley-Edwards <au...@gmail.com>.
Hey Alex,

I'm not sure if there is a best practice here, but what I can tell you is
that I worked on a job that did exactly what you're suggesting with a
non-async operator to create a [record, config] tuple, which was then
passed to the async stage. Our config objects were also not tiny (~500kb)
and our pipeline not huge (~1M records/day and 1GB data/ day), but this
setup worked quite well. I'd say if latency isn't your most important
metric, or if your pipeline is a similar size, the ease of async IO is
worth it.

One thing you'll have to look out for (if you haven't already) is
bootstrapping the config objects when the job starts, since the broadcast
from the polling source can happen later than recieving the first record –
we solved this by calling the polling source's service in the `open()`
method of the non-async operator and storing the initial configs in memory.

Hope that helps a bit,
Austin

On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <al...@cluonflux.com> wrote:

> Hi folks,
>
> I have a somewhat complex Flink job that has a few async stages, and a few
> stateful stages. It currently loads its configuration on startup, and
> doesn't attempt to refresh it.
>
> Now I'm working on dynamic reconfiguration. I've written a polling source
> which sends a configuration snapshot whenever anything has changed, I've
> set up a broadcast of that source, and I'm updating the operators in the
> data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've
> reached the first async operator, and I recall that async functions aren't
> allowed to be stateful.
>
> I've tried to find a best practice for this situation, without much luck.
> My best idea so far is to insert a new stage before the async one, which
> would tuple up each record with its corresponding config snapshot from the
> most recent broadcast state. This would increase the amount of data that
> needs to be serialized, and some of the configs are quite large, but would
> allow me to continue using async IO.
>
> Any suggestions?
>
> Thanks!
>
> -0xe1a
>