You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Nico Kruber <ni...@apache.org> on 2022/05/06 12:37:47 UTC

Re: [DISCUSS] FLIP-220: Temporal State

While working a bit more on this, David A and I noticed a couple of things 
that were not matching each other in the proposed APIs:

a) the proposed BinarySortedMultiMapState class didn't actually have getters 
that return multiple items per key, and
b) while having a single multi-map like implementation in the backend, with 
the adapted API, we'd like to put it up for discussion again whether we maybe 
want to have a user-facing BinarySortedMapState API as well which can be 
simpler but doesn't add any additional constraints to the state backends.

Let me go into details a bit more:
in a multi-map, a single key can be backed by a set of items and as such, the 
atomic unit that should be retrievable is not a single item but rather 
something like a Collection, an Iterable , a List, or so. Since we are already 
using Iterable in the main API, how about the following?
```
public class BinarySortedMultiMapState<UK, UV> extends State {
  void put(UK key, Iterable<UV> values) throws Exception;
  void add(UK key, UV value) throws Exception;

  Iterable<UV> valueAt(UK key) throws Exception;

  Map.Entry<UK, Iterable<UV>> firstEntry() throws Exception;
  Map.Entry<UK, Iterable<UV>> lastEntry() throws Exception;

  Iterable<Map.Entry<UK, Iterable<UV>>> readRange(UK fromKey, UK toKey) throws 
Exception;
  Iterable<Map.Entry<UK, Iterable<UV>>> readRangeUntil(UK endUserKey) throws 
Exception;
  Iterable<Map.Entry<UK, Iterable<UV>>> readRangeFrom(UK startUserKey) throws 
Exception;

  void clearRange(UK fromKey, UK toKey) throws Exception;
  void clearRangeUntil(UK endUserKey) throws Exception;
  void clearRangeFrom(UK startUserKey) throws Exception;
}
```

We also considered using Iterable<Map.Entry<UK, UV>> instead of Map.Entry<UK, 
Iterable<UV>>, but that wouldn't match well with firstEntry() and lastEntry() 
because for a single key, there is not a single first/last value. We also 
looked at common MultiMap insterfaces and their getters were also always 
retrieving the whole list/collection for a key. Since we don't want to promise 
too many details to the user, we believe, Iterable is our best choice for now 
but that can also be "upgraded" to, e.g., List in the future without breaking 
client code.

An appropriate map-like version of that would be the following:
```
public class BinarySortedMapState<UK, UV> extends State {
  void put(UK key, UV value) throws Exception;

  UV valueAt(UK key) throws Exception;

  Map.Entry<UK, UV> firstEntry() throws Exception;
  Map.Entry<UK, UV> lastEntry() throws Exception;

  Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws 
Exception;
  Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws Exception;
  Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey) throws Exception;

  void clearRange(UK fromKey, UK toKey) throws Exception;
  void clearRangeUntil(UK endUserKey) throws Exception;
  void clearRangeFrom(UK startUserKey) throws Exception;
}
```


We believe, we were also missing details regarding the state descriptor and 
I'm still a bit fuzzy on what to provide as type T in StateDescriptor<S 
extends State, T>.
For the constructors, however, since we'd require a 
LexicographicalTypeSerializer implementation, we would propose the following 
three overloads similar to the MapStateDescriptor:
```
public class BinarySortedMultiMapStateDescriptor<UK, UV> extends 
StateDescriptor<BinarySortedMultiMapState<UK, UV>, Map<UK, List<UV>>/*?*/> {

    public BinarySortedMapStateDescriptor(
            String name, LexicographicalTypeSerializer<UK> keySerializer, 
TypeSerializer<UV> valueSerializer) {}

    public BinarySortedMapStateDescriptor(
            String name, LexicographicalTypeSerializer<UK> keySerializer, 
TypeInformation<UV> valueTypeInfo) {}

    public BinarySortedMapStateDescriptor(
            String name, LexicographicalTypeSerializer<UK> keySerializer, 
Class<UV> valueClass) {}
}
```
Technically, we could have a LexicographicalTypeInformation as well (for the 
2nd overload) but don't currently see the need for that wrapper since these 
serializers are just needed for State - but maybe someone with more insights 
into this topic can advise.


A few further points to to with respect to the implementation:
- we'll have to find a suitable heap-based state backend implementation that 
integrates well with all current efforts (needs to be discussed)
- the StateProcessor API will have to receive appropriate extensions to read 
and write this new form of state



Nico


On Friday, 29 April 2022 14:25:59 CEST Nico Kruber wrote:
> Hi all,
> Yun, David M, David A, and I had an offline discussion and talked through a
> couple of details that emerged from the discussion here. We believe, we have
> found a consensus on these points and would like to share our points for
> further feedback:
> 
> Let me try to get through the points that were opened in arbitrary order:
> 
> 
> 1. We want to offer a generic interface for sorted state, not just temporal
> state as proposed initially. We would like to...
> a) ...offer a single new state type similar to what TemporalListState was
> offering (so not offering something like TemporalValueState to keep the API
> slim).
> b) ...name it BinarySortedMultiMap<UK, UV> with Java-Object keys and values
> (I'll go into the API further below) - the naming stresses on "Binary"
> because we want to make clear that this is what the sort will be based on
> (see below) c) ...have our own state descriptor
> (BinarySortedMultiMapStateDescriptor<UK, UV>) similar to
> MapStateDescriptor<UK, UV>
> d) ...require TypeSerializer implementations for the key to extend from
> LexicographicalTypeSerializer (details below)
> 
> 
> 2. LexicographicalTypeSerializer basically defines the sort order when
> retrieving values: it is based on the serialized binaries, comparing them
> one- by-one in an unsigned fashion. For heap-based state backends, these
> serializers can also optionally define a Comparator that doesn't require
> serialization but needs to retain the same sort order. We would provide
> implementations of the range-based operations that will iterate based on
> binary keys if this is not provided (by simply converting all relevant keys
> to their binary form and using an appropriate comparator).
> 
> ```
> public interface LexicographicalTypeSerializer<T> extends TypeSerializer<T>
> { default Optional<Comparator<T>> findComparator() {
>     return Optional<Comparator<T>>.empty()
>   }
> }
> ```
> 
> 
> 3. BinarySortedMultiMap<UK, UV> should offer the following API
> 
> ```
> public class BinarySortedMultiMap<UK, UV> extends State {
>   void put(UK key, Collection<UV>) throws Exception;
>   void add(UK key, UV value) throws Exception;
> 
>   Map.Entry<UK, UV> entryAt(UK key) throws Exception;
>   Map.Entry<UK, UV> firstEntry() throws Exception;
>   Map.Entry<UK, UV> lastEntry() throws Exception;
> 
>   Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws
> Exception;
>   Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws
> Exception; Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey)
> throws Exception;
> 
>   Iterable<Map.Entry<UK, UV>> clearRange(UK fromKey, UK toKey) throws
> Exception;
>   Iterable<Map.Entry<UK, UV>> clearRangeUntil(UK endUserKey) throws
> Exception; Iterable<Map.Entry<UK, UV>> clearRangeFrom(UK startUserKey)
> throws Exception;
> }
> ```
> 
> That's for the core of the points - following a few more things that came up
> and some arguments about the "why":
> 
> A1) Do we need value[AtOrBefore|AtOrAfter|Before|After]?
> -> We looked at various use cases and didn't find a strong need because you
> could always fall back to readRange*. In the interest of a slim API, we
> thought it would be best to start without these (we can always add them
> later)
> 
> A2) Should we support iterating backwards?
> -> We haven't found a compelling use case that needs this. If you need it,
> at least for some (?) use cases, you could negate the sort order through
> the serializer and achieve the same thing (unless you need to walk in two
> directions). Let's rather start with a slim API.
> 
> A3) Lazy vs. eager iteration
> -> Let's implement our iterators similarly to RocksDBMapIterator by eagerly
> retrieving a couple of values (defaults to 128 here) into a cache. This
> seems to be the best of both worlds without bloating the API
> 
> A4) ChangelogStateBackend
> -> Since we require TypeSerializer implementations for the key and those
> know the length to serializer (from other requirements, e.g. in the network
> stack), it isn't affected by our change except for delegating the new
> operations to the underlying state backend.
> 
> A5) Defining the binary sort order as one-by-one with unsigned bytes is fine
> because it is a very common thing among storage systems. Should a different
> binary-based state backend require something else, there could be a mapping
> function translating between different definitions.
> 
> A6) How to handle Duplicates
> -> We let the user handle this by storing a multi-map, i.e. multiple values
> for the (primary) sort key. If needed, users can sort these values manually.
> As long as we don't have a strong use case where this is not feasible, we
> don't need any implicit duplicate handling by the framework (Flink).
> 
> A7) readRangeUntil vsl. headIterator and readRangeFrom vs. tailIterator
> -> We propose to use readRange*** because that seems more explicit/intuitive
> in what this is doing.
> 
> A8) readRange*** with inclusive/exclusive switch
> -> In the interest of a slim API, let's not provide that. The API above will
> interpret all keys as _inclusive_ and should a user need exclusive
> behaviour, they would in the worst case read one more entry - in most of
> the cases, however, this would be served from the cache anyway, so it's not
> much of a problem
> 
> A9) Why don't we want to provide a BinarySortedMap with value-like semantics
> similar to TemporalValueState?
> -> We'd like to keep the code overhead in Flink small and not provide two
> more state primitives but instead only a single one. For use cases where
> you don't want to handle lists, you can use the BinarySortedMultiMap with
> its put() method and a list with a single entry that would overwrite the
> old one. While retrieving the value(s), you can then assume the list is
> either empty or has a single entry similar to what you are currently doing
> in a
> WindowProcessFunction. You can also always add a thin wrapper to provide
> that under a more convenient API if you need to.
> 
> A10) effects on the CEPOperator?
> -> We don't have an overview yet. The buffering of events inside its
> `MapState<Long, List<IN>> elementQueueState`, however, is a pattern that
> would benefit from our MultiMap since a single add() operation wouldn't
> require you to read the whole list again.
> 
> 
> Sorry for the long email - we'd be happy to get more feedback and will
> incorporate this into the FLIP description soon.
> 
> 
> 
> Nico

Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner



Re: [DISCUSS] FLIP-220: Temporal State

Posted by Jingsong Li <ji...@gmail.com>.
+1 to generic interface for sorted state and Binary***State.

Very happy to be able to go one step further and thank you for your discussion.

Best,
Jingsong

On Fri, May 6, 2022 at 8:37 PM Nico Kruber <ni...@apache.org> wrote:
>
> While working a bit more on this, David A and I noticed a couple of things
> that were not matching each other in the proposed APIs:
>
> a) the proposed BinarySortedMultiMapState class didn't actually have getters
> that return multiple items per key, and
> b) while having a single multi-map like implementation in the backend, with
> the adapted API, we'd like to put it up for discussion again whether we maybe
> want to have a user-facing BinarySortedMapState API as well which can be
> simpler but doesn't add any additional constraints to the state backends.
>
> Let me go into details a bit more:
> in a multi-map, a single key can be backed by a set of items and as such, the
> atomic unit that should be retrievable is not a single item but rather
> something like a Collection, an Iterable , a List, or so. Since we are already
> using Iterable in the main API, how about the following?
> ```
> public class BinarySortedMultiMapState<UK, UV> extends State {
>   void put(UK key, Iterable<UV> values) throws Exception;
>   void add(UK key, UV value) throws Exception;
>
>   Iterable<UV> valueAt(UK key) throws Exception;
>
>   Map.Entry<UK, Iterable<UV>> firstEntry() throws Exception;
>   Map.Entry<UK, Iterable<UV>> lastEntry() throws Exception;
>
>   Iterable<Map.Entry<UK, Iterable<UV>>> readRange(UK fromKey, UK toKey) throws
> Exception;
>   Iterable<Map.Entry<UK, Iterable<UV>>> readRangeUntil(UK endUserKey) throws
> Exception;
>   Iterable<Map.Entry<UK, Iterable<UV>>> readRangeFrom(UK startUserKey) throws
> Exception;
>
>   void clearRange(UK fromKey, UK toKey) throws Exception;
>   void clearRangeUntil(UK endUserKey) throws Exception;
>   void clearRangeFrom(UK startUserKey) throws Exception;
> }
> ```
>
> We also considered using Iterable<Map.Entry<UK, UV>> instead of Map.Entry<UK,
> Iterable<UV>>, but that wouldn't match well with firstEntry() and lastEntry()
> because for a single key, there is not a single first/last value. We also
> looked at common MultiMap insterfaces and their getters were also always
> retrieving the whole list/collection for a key. Since we don't want to promise
> too many details to the user, we believe, Iterable is our best choice for now
> but that can also be "upgraded" to, e.g., List in the future without breaking
> client code.
>
> An appropriate map-like version of that would be the following:
> ```
> public class BinarySortedMapState<UK, UV> extends State {
>   void put(UK key, UV value) throws Exception;
>
>   UV valueAt(UK key) throws Exception;
>
>   Map.Entry<UK, UV> firstEntry() throws Exception;
>   Map.Entry<UK, UV> lastEntry() throws Exception;
>
>   Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws
> Exception;
>   Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws Exception;
>   Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey) throws Exception;
>
>   void clearRange(UK fromKey, UK toKey) throws Exception;
>   void clearRangeUntil(UK endUserKey) throws Exception;
>   void clearRangeFrom(UK startUserKey) throws Exception;
> }
> ```
>
>
> We believe, we were also missing details regarding the state descriptor and
> I'm still a bit fuzzy on what to provide as type T in StateDescriptor<S
> extends State, T>.
> For the constructors, however, since we'd require a
> LexicographicalTypeSerializer implementation, we would propose the following
> three overloads similar to the MapStateDescriptor:
> ```
> public class BinarySortedMultiMapStateDescriptor<UK, UV> extends
> StateDescriptor<BinarySortedMultiMapState<UK, UV>, Map<UK, List<UV>>/*?*/> {
>
>     public BinarySortedMapStateDescriptor(
>             String name, LexicographicalTypeSerializer<UK> keySerializer,
> TypeSerializer<UV> valueSerializer) {}
>
>     public BinarySortedMapStateDescriptor(
>             String name, LexicographicalTypeSerializer<UK> keySerializer,
> TypeInformation<UV> valueTypeInfo) {}
>
>     public BinarySortedMapStateDescriptor(
>             String name, LexicographicalTypeSerializer<UK> keySerializer,
> Class<UV> valueClass) {}
> }
> ```
> Technically, we could have a LexicographicalTypeInformation as well (for the
> 2nd overload) but don't currently see the need for that wrapper since these
> serializers are just needed for State - but maybe someone with more insights
> into this topic can advise.
>
>
> A few further points to to with respect to the implementation:
> - we'll have to find a suitable heap-based state backend implementation that
> integrates well with all current efforts (needs to be discussed)
> - the StateProcessor API will have to receive appropriate extensions to read
> and write this new form of state
>
>
>
> Nico
>
>
> On Friday, 29 April 2022 14:25:59 CEST Nico Kruber wrote:
> > Hi all,
> > Yun, David M, David A, and I had an offline discussion and talked through a
> > couple of details that emerged from the discussion here. We believe, we have
> > found a consensus on these points and would like to share our points for
> > further feedback:
> >
> > Let me try to get through the points that were opened in arbitrary order:
> >
> >
> > 1. We want to offer a generic interface for sorted state, not just temporal
> > state as proposed initially. We would like to...
> > a) ...offer a single new state type similar to what TemporalListState was
> > offering (so not offering something like TemporalValueState to keep the API
> > slim).
> > b) ...name it BinarySortedMultiMap<UK, UV> with Java-Object keys and values
> > (I'll go into the API further below) - the naming stresses on "Binary"
> > because we want to make clear that this is what the sort will be based on
> > (see below) c) ...have our own state descriptor
> > (BinarySortedMultiMapStateDescriptor<UK, UV>) similar to
> > MapStateDescriptor<UK, UV>
> > d) ...require TypeSerializer implementations for the key to extend from
> > LexicographicalTypeSerializer (details below)
> >
> >
> > 2. LexicographicalTypeSerializer basically defines the sort order when
> > retrieving values: it is based on the serialized binaries, comparing them
> > one- by-one in an unsigned fashion. For heap-based state backends, these
> > serializers can also optionally define a Comparator that doesn't require
> > serialization but needs to retain the same sort order. We would provide
> > implementations of the range-based operations that will iterate based on
> > binary keys if this is not provided (by simply converting all relevant keys
> > to their binary form and using an appropriate comparator).
> >
> > ```
> > public interface LexicographicalTypeSerializer<T> extends TypeSerializer<T>
> > { default Optional<Comparator<T>> findComparator() {
> >     return Optional<Comparator<T>>.empty()
> >   }
> > }
> > ```
> >
> >
> > 3. BinarySortedMultiMap<UK, UV> should offer the following API
> >
> > ```
> > public class BinarySortedMultiMap<UK, UV> extends State {
> >   void put(UK key, Collection<UV>) throws Exception;
> >   void add(UK key, UV value) throws Exception;
> >
> >   Map.Entry<UK, UV> entryAt(UK key) throws Exception;
> >   Map.Entry<UK, UV> firstEntry() throws Exception;
> >   Map.Entry<UK, UV> lastEntry() throws Exception;
> >
> >   Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws
> > Exception;
> >   Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws
> > Exception; Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey)
> > throws Exception;
> >
> >   Iterable<Map.Entry<UK, UV>> clearRange(UK fromKey, UK toKey) throws
> > Exception;
> >   Iterable<Map.Entry<UK, UV>> clearRangeUntil(UK endUserKey) throws
> > Exception; Iterable<Map.Entry<UK, UV>> clearRangeFrom(UK startUserKey)
> > throws Exception;
> > }
> > ```
> >
> > That's for the core of the points - following a few more things that came up
> > and some arguments about the "why":
> >
> > A1) Do we need value[AtOrBefore|AtOrAfter|Before|After]?
> > -> We looked at various use cases and didn't find a strong need because you
> > could always fall back to readRange*. In the interest of a slim API, we
> > thought it would be best to start without these (we can always add them
> > later)
> >
> > A2) Should we support iterating backwards?
> > -> We haven't found a compelling use case that needs this. If you need it,
> > at least for some (?) use cases, you could negate the sort order through
> > the serializer and achieve the same thing (unless you need to walk in two
> > directions). Let's rather start with a slim API.
> >
> > A3) Lazy vs. eager iteration
> > -> Let's implement our iterators similarly to RocksDBMapIterator by eagerly
> > retrieving a couple of values (defaults to 128 here) into a cache. This
> > seems to be the best of both worlds without bloating the API
> >
> > A4) ChangelogStateBackend
> > -> Since we require TypeSerializer implementations for the key and those
> > know the length to serializer (from other requirements, e.g. in the network
> > stack), it isn't affected by our change except for delegating the new
> > operations to the underlying state backend.
> >
> > A5) Defining the binary sort order as one-by-one with unsigned bytes is fine
> > because it is a very common thing among storage systems. Should a different
> > binary-based state backend require something else, there could be a mapping
> > function translating between different definitions.
> >
> > A6) How to handle Duplicates
> > -> We let the user handle this by storing a multi-map, i.e. multiple values
> > for the (primary) sort key. If needed, users can sort these values manually.
> > As long as we don't have a strong use case where this is not feasible, we
> > don't need any implicit duplicate handling by the framework (Flink).
> >
> > A7) readRangeUntil vsl. headIterator and readRangeFrom vs. tailIterator
> > -> We propose to use readRange*** because that seems more explicit/intuitive
> > in what this is doing.
> >
> > A8) readRange*** with inclusive/exclusive switch
> > -> In the interest of a slim API, let's not provide that. The API above will
> > interpret all keys as _inclusive_ and should a user need exclusive
> > behaviour, they would in the worst case read one more entry - in most of
> > the cases, however, this would be served from the cache anyway, so it's not
> > much of a problem
> >
> > A9) Why don't we want to provide a BinarySortedMap with value-like semantics
> > similar to TemporalValueState?
> > -> We'd like to keep the code overhead in Flink small and not provide two
> > more state primitives but instead only a single one. For use cases where
> > you don't want to handle lists, you can use the BinarySortedMultiMap with
> > its put() method and a list with a single entry that would overwrite the
> > old one. While retrieving the value(s), you can then assume the list is
> > either empty or has a single entry similar to what you are currently doing
> > in a
> > WindowProcessFunction. You can also always add a thin wrapper to provide
> > that under a more convenient API if you need to.
> >
> > A10) effects on the CEPOperator?
> > -> We don't have an overview yet. The buffering of events inside its
> > `MapState<Long, List<IN>> elementQueueState`, however, is a pattern that
> > would benefit from our MultiMap since a single add() operation wouldn't
> > require you to read the whole list again.
> >
> >
> > Sorry for the long email - we'd be happy to get more feedback and will
> > incorporate this into the FLIP description soon.
> >
> >
> >
> > Nico
>
> Dr. Nico Kruber | Solutions Architect
>
> Follow us @VervericaData Ververica
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>
>