You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2020/09/09 14:09:19 UTC

[DISCUSS] Deprecate and remove UnionList OperatorState

Hi Devs,

@Users: I'm cc'ing the user ML to see if there are any users that are 
relying on this feature. Please comment here if that is the case.

I'd like to discuss the deprecation and eventual removal of UnionList 
Operator State, aka Operator State with Union Redistribution. If you 
don't know what I'm talking about you can take a look in the 
documentation: [1]. It's not documented thoroughly because it started 
out as mostly an internal feature.

The immediate main reason for removing this is also mentioned in the 
documentation: "Do not use this feature if your list may have high 
cardinality. Checkpoint metadata will store an offset to each list 
entry, which could lead to RPC framesize or out-of-memory errors." The 
insidious part of this limitation is that you will only notice that 
there is a problem when it is too late. Checkpointing will still work 
and a program can continue when the state size is too big. The system 
will only fail when trying to restore from a snapshot that has union 
state that is too big. This could be fixed by working around that issue 
but I think there are more long-term issues with this type of state.

I think we need to deprecate and remove API for state that is not tied 
to a key. Keyed state is easy to reason about, the system can 
re-partition state and also re-partition records and therefore scale the 
system in and out. Operator state, on the other hand is not tied to a 
key but an operator. This is a more "physical" concept, if you will, 
that potentially ties business logic closer to the underlying runtime 
execution model, which in turns means less degrees of freedom for the 
framework, that is Flink. This is future work, though, but we should 
start with deprecating union list state because it is the potentially 
most dangerous type of state.

We currently use this state type internally in at least the 
StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However, 
we're in the process of hopefully getting rid of it there with our work 
on sources and sinks. Before we fully remove it, we should of course 
signal this to users by deprecating it.

What do you think?

Best,
Aljoscha

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Aljoscha Krettek <al...@apache.org>.
On 14.09.20 02:20, Steven Wu wrote:
> Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg
> sink use case, because we can't retrieve the checkpointId from
> the FunctionInitializationContext during the restore case. But we can move
> away from it if the restore context provides the checkpointId.

Is the code for this available in the open source? I checked the Iceberg 
sink that's available in Iceberg proper and the one in Netflix 
Skunkworks: 
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L228

Both of them are only using operator state, not the union variant.

Best,
Aljoscha

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Steven Wu <st...@gmail.com>.
Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg
sink use case, because we can't retrieve the checkpointId from
the FunctionInitializationContext during the restore case. But we can move
away from it if the restore context provides the checkpointId.

On Sat, Sep 12, 2020 at 8:20 AM Alexey Trenikhun <ye...@msn.com> wrote:

> -1
>
> We use union state to generate sequences, each operator generates offset0
> + number-of-tasks -  task-index + task-specific-counter * number-of-tasks
> (e.g. for 2 instances of operator -one instance produce even number,
> another odd). Last generated sequence number is stored union list state, on
> restart from where we should start to avoid collision with already
> generated numbers, to do saw we calculate offset0 as max over union list
> state.
>
> Alexey
>
> ------------------------------
> *From:* Seth Wiesman <sj...@gmail.com>
> *Sent:* Wednesday, September 9, 2020 9:37:03 AM
> *To:* dev <de...@flink.apache.org>
> *Cc:* Aljoscha Krettek <al...@apache.org>; user <us...@flink.apache.org>
> *Subject:* Re: [DISCUSS] Deprecate and remove UnionList OperatorState
>
> Generally +1
>
> The one use case I've seen of union state I've seen in production (outside
> of sources and sinks) is as a "poor mans" broadcast state. This was
> obviously before that feature was added which is now a few years ago so I
> don't know if those pipelines still exist. FWIW, if they do the state
> processor api can provide a migration path as it supports rewriting union
> state as broadcast state.
>
> Seth
>
> On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <ar...@ververica.com> wrote:
>
> +1 to getting rid of non-keyed state as is in general and for union state
> in particular. I had a hard time to wrap my head around the semantics of
> non-keyed state when designing the rescale of unaligned checkpoint.
>
> The only plausible use cases are legacy source and sinks. Both should also
> be reworked in deprecated.
>
> My main question is how to represent state in these two cases. For sources,
> state should probably be bound to splits. In that regard, split (id) may
> act as a key. More generally, there should be probably a concept that
> supersedes keys and includes splits.
>
> For sinks, I can see two cases:
> - Either we are in a keyed context, then state should be bound to the key.
> - Or we are in a non-keyed context, then state might be bound to the split
> (?) in case of a source->sink chaining.
> - Maybe it should also be a new(?) concept like output partition.
>
> It's not clear to me if there are more cases and if we can always find a
> good way to bind state to some sort of key, especially for arbitrary
> communication patterns (which we may need to replace as well potentially).
>
> On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi Devs,
> >
> > @Users: I'm cc'ing the user ML to see if there are any users that are
> > relying on this feature. Please comment here if that is the case.
> >
> > I'd like to discuss the deprecation and eventual removal of UnionList
> > Operator State, aka Operator State with Union Redistribution. If you
> > don't know what I'm talking about you can take a look in the
> > documentation: [1]. It's not documented thoroughly because it started
> > out as mostly an internal feature.
> >
> > The immediate main reason for removing this is also mentioned in the
> > documentation: "Do not use this feature if your list may have high
> > cardinality. Checkpoint metadata will store an offset to each list
> > entry, which could lead to RPC framesize or out-of-memory errors." The
> > insidious part of this limitation is that you will only notice that
> > there is a problem when it is too late. Checkpointing will still work
> > and a program can continue when the state size is too big. The system
> > will only fail when trying to restore from a snapshot that has union
> > state that is too big. This could be fixed by working around that issue
> > but I think there are more long-term issues with this type of state.
> >
> > I think we need to deprecate and remove API for state that is not tied
> > to a key. Keyed state is easy to reason about, the system can
> > re-partition state and also re-partition records and therefore scale the
> > system in and out. Operator state, on the other hand is not tied to a
> > key but an operator. This is a more "physical" concept, if you will,
> > that potentially ties business logic closer to the underlying runtime
> > execution model, which in turns means less degrees of freedom for the
> > framework, that is Flink. This is future work, though, but we should
> > start with deprecating union list state because it is the potentially
> > most dangerous type of state.
> >
> > We currently use this state type internally in at least the
> > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> > we're in the process of hopefully getting rid of it there with our work
> > on sources and sinks. Before we fully remove it, we should of course
> > signal this to users by deprecating it.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Steven Wu <st...@gmail.com>.
Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg
sink use case, because we can't retrieve the checkpointId from
the FunctionInitializationContext during the restore case. But we can move
away from it if the restore context provides the checkpointId.

On Sat, Sep 12, 2020 at 8:20 AM Alexey Trenikhun <ye...@msn.com> wrote:

> -1
>
> We use union state to generate sequences, each operator generates offset0
> + number-of-tasks -  task-index + task-specific-counter * number-of-tasks
> (e.g. for 2 instances of operator -one instance produce even number,
> another odd). Last generated sequence number is stored union list state, on
> restart from where we should start to avoid collision with already
> generated numbers, to do saw we calculate offset0 as max over union list
> state.
>
> Alexey
>
> ------------------------------
> *From:* Seth Wiesman <sj...@gmail.com>
> *Sent:* Wednesday, September 9, 2020 9:37:03 AM
> *To:* dev <de...@flink.apache.org>
> *Cc:* Aljoscha Krettek <al...@apache.org>; user <us...@flink.apache.org>
> *Subject:* Re: [DISCUSS] Deprecate and remove UnionList OperatorState
>
> Generally +1
>
> The one use case I've seen of union state I've seen in production (outside
> of sources and sinks) is as a "poor mans" broadcast state. This was
> obviously before that feature was added which is now a few years ago so I
> don't know if those pipelines still exist. FWIW, if they do the state
> processor api can provide a migration path as it supports rewriting union
> state as broadcast state.
>
> Seth
>
> On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <ar...@ververica.com> wrote:
>
> +1 to getting rid of non-keyed state as is in general and for union state
> in particular. I had a hard time to wrap my head around the semantics of
> non-keyed state when designing the rescale of unaligned checkpoint.
>
> The only plausible use cases are legacy source and sinks. Both should also
> be reworked in deprecated.
>
> My main question is how to represent state in these two cases. For sources,
> state should probably be bound to splits. In that regard, split (id) may
> act as a key. More generally, there should be probably a concept that
> supersedes keys and includes splits.
>
> For sinks, I can see two cases:
> - Either we are in a keyed context, then state should be bound to the key.
> - Or we are in a non-keyed context, then state might be bound to the split
> (?) in case of a source->sink chaining.
> - Maybe it should also be a new(?) concept like output partition.
>
> It's not clear to me if there are more cases and if we can always find a
> good way to bind state to some sort of key, especially for arbitrary
> communication patterns (which we may need to replace as well potentially).
>
> On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi Devs,
> >
> > @Users: I'm cc'ing the user ML to see if there are any users that are
> > relying on this feature. Please comment here if that is the case.
> >
> > I'd like to discuss the deprecation and eventual removal of UnionList
> > Operator State, aka Operator State with Union Redistribution. If you
> > don't know what I'm talking about you can take a look in the
> > documentation: [1]. It's not documented thoroughly because it started
> > out as mostly an internal feature.
> >
> > The immediate main reason for removing this is also mentioned in the
> > documentation: "Do not use this feature if your list may have high
> > cardinality. Checkpoint metadata will store an offset to each list
> > entry, which could lead to RPC framesize or out-of-memory errors." The
> > insidious part of this limitation is that you will only notice that
> > there is a problem when it is too late. Checkpointing will still work
> > and a program can continue when the state size is too big. The system
> > will only fail when trying to restore from a snapshot that has union
> > state that is too big. This could be fixed by working around that issue
> > but I think there are more long-term issues with this type of state.
> >
> > I think we need to deprecate and remove API for state that is not tied
> > to a key. Keyed state is easy to reason about, the system can
> > re-partition state and also re-partition records and therefore scale the
> > system in and out. Operator state, on the other hand is not tied to a
> > key but an operator. This is a more "physical" concept, if you will,
> > that potentially ties business logic closer to the underlying runtime
> > execution model, which in turns means less degrees of freedom for the
> > framework, that is Flink. This is future work, though, but we should
> > start with deprecating union list state because it is the potentially
> > most dangerous type of state.
> >
> > We currently use this state type internally in at least the
> > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> > we're in the process of hopefully getting rid of it there with our work
> > on sources and sinks. Before we fully remove it, we should of course
> > signal this to users by deprecating it.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Aljoscha Krettek <al...@apache.org>.
On 12.09.20 17:20, Alexey Trenikhun wrote:
> We use union state to generate sequences, each operator generates offset0 + number-of-tasks -  task-index + task-specific-counter * number-of-tasks (e.g. for 2 instances of operator -one instance produce even number, another odd). Last generated sequence number is stored union list state, on restart from where we should start to avoid collision with already generated numbers, to do saw we calculate offset0 as max over union list state.

Is this for testing or a production use case?

With the introduction of the new source interface in FLIP-27 [1] it will 
not be possible to access different state types in a sink. However, 
using the enumerator and readers should work for your use case, since 
the enumerator acts as a central coordinating component that would 
assign ranges to the different readers. Have you looked at this new 
development perchance?

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface


Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Alexey Trenikhun <ye...@msn.com>.
-1

We use union state to generate sequences, each operator generates offset0 + number-of-tasks -  task-index + task-specific-counter * number-of-tasks (e.g. for 2 instances of operator -one instance produce even number, another odd). Last generated sequence number is stored union list state, on restart from where we should start to avoid collision with already generated numbers, to do saw we calculate offset0 as max over union list state.

Alexey

________________________________
From: Seth Wiesman <sj...@gmail.com>
Sent: Wednesday, September 9, 2020 9:37:03 AM
To: dev <de...@flink.apache.org>
Cc: Aljoscha Krettek <al...@apache.org>; user <us...@flink.apache.org>
Subject: Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Generally +1

The one use case I've seen of union state I've seen in production (outside of sources and sinks) is as a "poor mans" broadcast state. This was obviously before that feature was added which is now a few years ago so I don't know if those pipelines still exist. FWIW, if they do the state processor api can provide a migration path as it supports rewriting union state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <ar...@ververica.com>> wrote:
+1 to getting rid of non-keyed state as is in general and for union state
in particular. I had a hard time to wrap my head around the semantics of
non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also
be reworked in deprecated.

My main question is how to represent state in these two cases. For sources,
state should probably be bound to splits. In that regard, split (id) may
act as a key. More generally, there should be probably a concept that
supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split
(?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a
good way to bind state to some sort of key, especially for arbitrary
communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <al...@apache.org>> wrote:

> Hi Devs,
>
> @Users: I'm cc'ing the user ML to see if there are any users that are
> relying on this feature. Please comment here if that is the case.
>
> I'd like to discuss the deprecation and eventual removal of UnionList
> Operator State, aka Operator State with Union Redistribution. If you
> don't know what I'm talking about you can take a look in the
> documentation: [1]. It's not documented thoroughly because it started
> out as mostly an internal feature.
>
> The immediate main reason for removing this is also mentioned in the
> documentation: "Do not use this feature if your list may have high
> cardinality. Checkpoint metadata will store an offset to each list
> entry, which could lead to RPC framesize or out-of-memory errors." The
> insidious part of this limitation is that you will only notice that
> there is a problem when it is too late. Checkpointing will still work
> and a program can continue when the state size is too big. The system
> will only fail when trying to restore from a snapshot that has union
> state that is too big. This could be fixed by working around that issue
> but I think there are more long-term issues with this type of state.
>
> I think we need to deprecate and remove API for state that is not tied
> to a key. Keyed state is easy to reason about, the system can
> re-partition state and also re-partition records and therefore scale the
> system in and out. Operator state, on the other hand is not tied to a
> key but an operator. This is a more "physical" concept, if you will,
> that potentially ties business logic closer to the underlying runtime
> execution model, which in turns means less degrees of freedom for the
> framework, that is Flink. This is future work, though, but we should
> start with deprecating union list state because it is the potentially
> most dangerous type of state.
>
> We currently use this state type internally in at least the
> StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> we're in the process of hopefully getting rid of it there with our work
> on sources and sinks. Before we fully remove it, we should of course
> signal this to users by deprecating it.
>
> What do you think?
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Alexey Trenikhun <ye...@msn.com>.
-1

We use union state to generate sequences, each operator generates offset0 + number-of-tasks -  task-index + task-specific-counter * number-of-tasks (e.g. for 2 instances of operator -one instance produce even number, another odd). Last generated sequence number is stored union list state, on restart from where we should start to avoid collision with already generated numbers, to do saw we calculate offset0 as max over union list state.

Alexey

________________________________
From: Seth Wiesman <sj...@gmail.com>
Sent: Wednesday, September 9, 2020 9:37:03 AM
To: dev <de...@flink.apache.org>
Cc: Aljoscha Krettek <al...@apache.org>; user <us...@flink.apache.org>
Subject: Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Generally +1

The one use case I've seen of union state I've seen in production (outside of sources and sinks) is as a "poor mans" broadcast state. This was obviously before that feature was added which is now a few years ago so I don't know if those pipelines still exist. FWIW, if they do the state processor api can provide a migration path as it supports rewriting union state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <ar...@ververica.com>> wrote:
+1 to getting rid of non-keyed state as is in general and for union state
in particular. I had a hard time to wrap my head around the semantics of
non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also
be reworked in deprecated.

My main question is how to represent state in these two cases. For sources,
state should probably be bound to splits. In that regard, split (id) may
act as a key. More generally, there should be probably a concept that
supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split
(?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a
good way to bind state to some sort of key, especially for arbitrary
communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <al...@apache.org>> wrote:

> Hi Devs,
>
> @Users: I'm cc'ing the user ML to see if there are any users that are
> relying on this feature. Please comment here if that is the case.
>
> I'd like to discuss the deprecation and eventual removal of UnionList
> Operator State, aka Operator State with Union Redistribution. If you
> don't know what I'm talking about you can take a look in the
> documentation: [1]. It's not documented thoroughly because it started
> out as mostly an internal feature.
>
> The immediate main reason for removing this is also mentioned in the
> documentation: "Do not use this feature if your list may have high
> cardinality. Checkpoint metadata will store an offset to each list
> entry, which could lead to RPC framesize or out-of-memory errors." The
> insidious part of this limitation is that you will only notice that
> there is a problem when it is too late. Checkpointing will still work
> and a program can continue when the state size is too big. The system
> will only fail when trying to restore from a snapshot that has union
> state that is too big. This could be fixed by working around that issue
> but I think there are more long-term issues with this type of state.
>
> I think we need to deprecate and remove API for state that is not tied
> to a key. Keyed state is easy to reason about, the system can
> re-partition state and also re-partition records and therefore scale the
> system in and out. Operator state, on the other hand is not tied to a
> key but an operator. This is a more "physical" concept, if you will,
> that potentially ties business logic closer to the underlying runtime
> execution model, which in turns means less degrees of freedom for the
> framework, that is Flink. This is future work, though, but we should
> start with deprecating union list state because it is the potentially
> most dangerous type of state.
>
> We currently use this state type internally in at least the
> StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> we're in the process of hopefully getting rid of it there with our work
> on sources and sinks. Before we fully remove it, we should of course
> signal this to users by deprecating it.
>
> What do you think?
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Seth Wiesman <sj...@gmail.com>.
Generally +1

The one use case I've seen of union state I've seen in production (outside
of sources and sinks) is as a "poor mans" broadcast state. This was
obviously before that feature was added which is now a few years ago so I
don't know if those pipelines still exist. FWIW, if they do the state
processor api can provide a migration path as it supports rewriting union
state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <ar...@ververica.com> wrote:

> +1 to getting rid of non-keyed state as is in general and for union state
> in particular. I had a hard time to wrap my head around the semantics of
> non-keyed state when designing the rescale of unaligned checkpoint.
>
> The only plausible use cases are legacy source and sinks. Both should also
> be reworked in deprecated.
>
> My main question is how to represent state in these two cases. For sources,
> state should probably be bound to splits. In that regard, split (id) may
> act as a key. More generally, there should be probably a concept that
> supersedes keys and includes splits.
>
> For sinks, I can see two cases:
> - Either we are in a keyed context, then state should be bound to the key.
> - Or we are in a non-keyed context, then state might be bound to the split
> (?) in case of a source->sink chaining.
> - Maybe it should also be a new(?) concept like output partition.
>
> It's not clear to me if there are more cases and if we can always find a
> good way to bind state to some sort of key, especially for arbitrary
> communication patterns (which we may need to replace as well potentially).
>
> On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi Devs,
> >
> > @Users: I'm cc'ing the user ML to see if there are any users that are
> > relying on this feature. Please comment here if that is the case.
> >
> > I'd like to discuss the deprecation and eventual removal of UnionList
> > Operator State, aka Operator State with Union Redistribution. If you
> > don't know what I'm talking about you can take a look in the
> > documentation: [1]. It's not documented thoroughly because it started
> > out as mostly an internal feature.
> >
> > The immediate main reason for removing this is also mentioned in the
> > documentation: "Do not use this feature if your list may have high
> > cardinality. Checkpoint metadata will store an offset to each list
> > entry, which could lead to RPC framesize or out-of-memory errors." The
> > insidious part of this limitation is that you will only notice that
> > there is a problem when it is too late. Checkpointing will still work
> > and a program can continue when the state size is too big. The system
> > will only fail when trying to restore from a snapshot that has union
> > state that is too big. This could be fixed by working around that issue
> > but I think there are more long-term issues with this type of state.
> >
> > I think we need to deprecate and remove API for state that is not tied
> > to a key. Keyed state is easy to reason about, the system can
> > re-partition state and also re-partition records and therefore scale the
> > system in and out. Operator state, on the other hand is not tied to a
> > key but an operator. This is a more "physical" concept, if you will,
> > that potentially ties business logic closer to the underlying runtime
> > execution model, which in turns means less degrees of freedom for the
> > framework, that is Flink. This is future work, though, but we should
> > start with deprecating union list state because it is the potentially
> > most dangerous type of state.
> >
> > We currently use this state type internally in at least the
> > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> > we're in the process of hopefully getting rid of it there with our work
> > on sources and sinks. Before we fully remove it, we should of course
> > signal this to users by deprecating it.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Seth Wiesman <sj...@gmail.com>.
Generally +1

The one use case I've seen of union state I've seen in production (outside
of sources and sinks) is as a "poor mans" broadcast state. This was
obviously before that feature was added which is now a few years ago so I
don't know if those pipelines still exist. FWIW, if they do the state
processor api can provide a migration path as it supports rewriting union
state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <ar...@ververica.com> wrote:

> +1 to getting rid of non-keyed state as is in general and for union state
> in particular. I had a hard time to wrap my head around the semantics of
> non-keyed state when designing the rescale of unaligned checkpoint.
>
> The only plausible use cases are legacy source and sinks. Both should also
> be reworked in deprecated.
>
> My main question is how to represent state in these two cases. For sources,
> state should probably be bound to splits. In that regard, split (id) may
> act as a key. More generally, there should be probably a concept that
> supersedes keys and includes splits.
>
> For sinks, I can see two cases:
> - Either we are in a keyed context, then state should be bound to the key.
> - Or we are in a non-keyed context, then state might be bound to the split
> (?) in case of a source->sink chaining.
> - Maybe it should also be a new(?) concept like output partition.
>
> It's not clear to me if there are more cases and if we can always find a
> good way to bind state to some sort of key, especially for arbitrary
> communication patterns (which we may need to replace as well potentially).
>
> On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi Devs,
> >
> > @Users: I'm cc'ing the user ML to see if there are any users that are
> > relying on this feature. Please comment here if that is the case.
> >
> > I'd like to discuss the deprecation and eventual removal of UnionList
> > Operator State, aka Operator State with Union Redistribution. If you
> > don't know what I'm talking about you can take a look in the
> > documentation: [1]. It's not documented thoroughly because it started
> > out as mostly an internal feature.
> >
> > The immediate main reason for removing this is also mentioned in the
> > documentation: "Do not use this feature if your list may have high
> > cardinality. Checkpoint metadata will store an offset to each list
> > entry, which could lead to RPC framesize or out-of-memory errors." The
> > insidious part of this limitation is that you will only notice that
> > there is a problem when it is too late. Checkpointing will still work
> > and a program can continue when the state size is too big. The system
> > will only fail when trying to restore from a snapshot that has union
> > state that is too big. This could be fixed by working around that issue
> > but I think there are more long-term issues with this type of state.
> >
> > I think we need to deprecate and remove API for state that is not tied
> > to a key. Keyed state is easy to reason about, the system can
> > re-partition state and also re-partition records and therefore scale the
> > system in and out. Operator state, on the other hand is not tied to a
> > key but an operator. This is a more "physical" concept, if you will,
> > that potentially ties business logic closer to the underlying runtime
> > execution model, which in turns means less degrees of freedom for the
> > framework, that is Flink. This is future work, though, but we should
> > start with deprecating union list state because it is the potentially
> > most dangerous type of state.
> >
> > We currently use this state type internally in at least the
> > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> > we're in the process of hopefully getting rid of it there with our work
> > on sources and sinks. Before we fully remove it, we should of course
> > signal this to users by deprecating it.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Arvid Heise <ar...@ververica.com>.
+1 to getting rid of non-keyed state as is in general and for union state
in particular. I had a hard time to wrap my head around the semantics of
non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also
be reworked in deprecated.

My main question is how to represent state in these two cases. For sources,
state should probably be bound to splits. In that regard, split (id) may
act as a key. More generally, there should be probably a concept that
supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split
(?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a
good way to bind state to some sort of key, especially for arbitrary
communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <al...@apache.org> wrote:

> Hi Devs,
>
> @Users: I'm cc'ing the user ML to see if there are any users that are
> relying on this feature. Please comment here if that is the case.
>
> I'd like to discuss the deprecation and eventual removal of UnionList
> Operator State, aka Operator State with Union Redistribution. If you
> don't know what I'm talking about you can take a look in the
> documentation: [1]. It's not documented thoroughly because it started
> out as mostly an internal feature.
>
> The immediate main reason for removing this is also mentioned in the
> documentation: "Do not use this feature if your list may have high
> cardinality. Checkpoint metadata will store an offset to each list
> entry, which could lead to RPC framesize or out-of-memory errors." The
> insidious part of this limitation is that you will only notice that
> there is a problem when it is too late. Checkpointing will still work
> and a program can continue when the state size is too big. The system
> will only fail when trying to restore from a snapshot that has union
> state that is too big. This could be fixed by working around that issue
> but I think there are more long-term issues with this type of state.
>
> I think we need to deprecate and remove API for state that is not tied
> to a key. Keyed state is easy to reason about, the system can
> re-partition state and also re-partition records and therefore scale the
> system in and out. Operator state, on the other hand is not tied to a
> key but an operator. This is a more "physical" concept, if you will,
> that potentially ties business logic closer to the underlying runtime
> execution model, which in turns means less degrees of freedom for the
> framework, that is Flink. This is future work, though, but we should
> start with deprecating union list state because it is the potentially
> most dangerous type of state.
>
> We currently use this state type internally in at least the
> StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> we're in the process of hopefully getting rid of it there with our work
> on sources and sinks. Before we fully remove it, we should of course
> signal this to users by deprecating it.
>
> What do you think?
>
> Best,
> Aljoscha
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Posted by Arvid Heise <ar...@ververica.com>.
+1 to getting rid of non-keyed state as is in general and for union state
in particular. I had a hard time to wrap my head around the semantics of
non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also
be reworked in deprecated.

My main question is how to represent state in these two cases. For sources,
state should probably be bound to splits. In that regard, split (id) may
act as a key. More generally, there should be probably a concept that
supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split
(?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a
good way to bind state to some sort of key, especially for arbitrary
communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <al...@apache.org> wrote:

> Hi Devs,
>
> @Users: I'm cc'ing the user ML to see if there are any users that are
> relying on this feature. Please comment here if that is the case.
>
> I'd like to discuss the deprecation and eventual removal of UnionList
> Operator State, aka Operator State with Union Redistribution. If you
> don't know what I'm talking about you can take a look in the
> documentation: [1]. It's not documented thoroughly because it started
> out as mostly an internal feature.
>
> The immediate main reason for removing this is also mentioned in the
> documentation: "Do not use this feature if your list may have high
> cardinality. Checkpoint metadata will store an offset to each list
> entry, which could lead to RPC framesize or out-of-memory errors." The
> insidious part of this limitation is that you will only notice that
> there is a problem when it is too late. Checkpointing will still work
> and a program can continue when the state size is too big. The system
> will only fail when trying to restore from a snapshot that has union
> state that is too big. This could be fixed by working around that issue
> but I think there are more long-term issues with this type of state.
>
> I think we need to deprecate and remove API for state that is not tied
> to a key. Keyed state is easy to reason about, the system can
> re-partition state and also re-partition records and therefore scale the
> system in and out. Operator state, on the other hand is not tied to a
> key but an operator. This is a more "physical" concept, if you will,
> that potentially ties business logic closer to the underlying runtime
> execution model, which in turns means less degrees of freedom for the
> framework, that is Flink. This is future work, though, but we should
> start with deprecating union list state because it is the potentially
> most dangerous type of state.
>
> We currently use this state type internally in at least the
> StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> we're in the process of hopefully getting rid of it there with our work
> on sources and sinks. Before we fully remove it, we should of course
> signal this to users by deprecating it.
>
> What do you think?
>
> Best,
> Aljoscha
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng