You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Sean Z <xi...@gmail.com> on 2020/10/14 07:30:43 UTC

[DISCUSS] Support KeyedSortedMapState in DataStream API

Hi devs,

Current DataStream API doesn't have SortedMapState supported. There are
lots of use cases based on sorted time-series data like range-query or
higher/lower key fetch, and ordered data seems like a nature of time-series
stream processing. Therefore, we propose to support the KeyedSortedMapState
feature.

There were some previous discussions [1] about SortedMapState, and the
thread was closed because blink code might cover this feature. However, the
blink code[2] wasn't merged into the master branch since then. The major
concern is the inconsistent comparison between heap/off-heap state
backends. In RocksDB, the comparison should be based on bytes, which makes
generic key types support challenging, and in heap state backend, the
comparison is more about Comparable interface.

There are two possible solutions to this issue in my opinion,
1. We could limit the key type to Long type, for most of the use cases are
about timestamp as a key. It's easier to implement but brings limitations
to support generic key types.
2. We keep the different sorting behavior of different state backends and
set it to bytes comparison for given serialization by default in off-heap
state backends. Let users provide their own specific serializer if they
want to sort some customized type on RocksDB.

Look forward to having some discussions about this feature. Please share
your ideas if anyone has context on this. Thanks!

Best,
Xinghan

[1] https://issues.apache.org/jira/browse/FLINK-6219
[2]
https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java

Re: [DISCUSS] Support KeyedSortedMapState in DataStream API

Posted by Aljoscha Krettek <al...@apache.org>.
Do we think this will be useful for users or do we first want to 
introduce this for internal use cases, such as the Table API/SQL runner?

Aljoscha

On 15.10.20 10:35, Sean Z wrote:
> Hi Jark,
> 
> Thanks for the reply and sharing thoughts.
> Yes, negative long will make things complicated. We had the exact same
> issue when implementing our own sortedMapState.
> This could be solved by some special pre-defined serializers, and it looks
> like that's what blink did [1] as you mentioned.
> 
> Blink implementation could be a good starting point and it would be better
> to extend to support random key types as long as users provide their own
> serializer, which is what I mentioned in option 2. I'm more in favor of
> this approach too. But the support of random key types seems the main
> blocker that prevents this Blink feature being merged into Flink, from what
> I understand.
> 
> In my opinion, limiting the key type might be easier to implement and it is
> actually not a bad idea to start with lite/consistent/fixed features. but
> yeah I'm not sure what the Temporal State is trying to solve here for I
> don't have too much context on it.
> 
> 
> [1]
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered/OrderedBytes.java
> 
> 
> Best,
> Xinghan
> 
> On Wed, Oct 14, 2020 at 7:19 PM Jark Wu <im...@gmail.com> wrote:
> 
>> Hi,
>>
>> Thanks for bringing this discussion.
>>
>> I think limiting the key type to Long can't resolve the comparison problem,
>> because the bytes order and value order of negative numbers is different.
>> Unless, we limit the key type to positive Long. But how to check this
>> before submitting a job?
>>
>> In Blink code, we keep different sorting behavior in different
>> statebackends.
>> We also supported sorted map state for various key types (almost all the
>> atomic types).
>> The idea is serializing the given type value into an ordered bytes, see
>> more:
>>
>>
>> https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered
>>
>> Best,
>> Jark
>>
>> On Thu, 15 Oct 2020 at 06:46, Sean Z <xi...@gmail.com> wrote:
>>
>>> Thanks for the reply! Look forward to learning more about this prototype.
>>> Is there any way that we could track this TemporalState like Jira issue?
>> or
>>> should we start to create one in Jira? so anyone has interest like me,
>>> could be part of the loop. Besides, is there any written docs/code about
>>> the prototype so we could have more context about that?
>>>
>>> Best,
>>> Xinghan
>>>
>>>
>>> On Wed, Oct 14, 2020 at 11:09 AM David Anderson <da...@alpinegizmo.com>
>>> wrote:
>>>
>>>> I'm very interested in this topic, and have even done some prototyping
>> of
>>>> solution 1 -- limiting the key type to Long -- which Nico Kruber and I
>>>> called TemporalState in our prototype.
>>>>
>>>> I look forward to sharing what we learned, and to discussing this
>>> further,
>>>> but I am completely overwhelmed with Flink Forward preparations at the
>>>> moment.
>>>>
>>>> Best,
>>>> David
>>>>
>>>> On Wed, Oct 14, 2020 at 9:30 AM Sean Z <xi...@gmail.com> wrote:
>>>>
>>>>> Hi devs,
>>>>>
>>>>> Current DataStream API doesn't have SortedMapState supported. There
>> are
>>>>> lots of use cases based on sorted time-series data like range-query
>> or
>>>>> higher/lower key fetch, and ordered data seems like a nature of
>>>> time-series
>>>>> stream processing. Therefore, we propose to support the
>>>> KeyedSortedMapState
>>>>> feature.
>>>>>
>>>>> There were some previous discussions [1] about SortedMapState, and
>> the
>>>>> thread was closed because blink code might cover this feature.
>> However,
>>>> the
>>>>> blink code[2] wasn't merged into the master branch since then. The
>>> major
>>>>> concern is the inconsistent comparison between heap/off-heap state
>>>>> backends. In RocksDB, the comparison should be based on bytes, which
>>>> makes
>>>>> generic key types support challenging, and in heap state backend, the
>>>>> comparison is more about Comparable interface.
>>>>>
>>>>> There are two possible solutions to this issue in my opinion,
>>>>> 1. We could limit the key type to Long type, for most of the use
>> cases
>>>> are
>>>>> about timestamp as a key. It's easier to implement but brings
>>> limitations
>>>>> to support generic key types.
>>>>> 2. We keep the different sorting behavior of different state backends
>>> and
>>>>> set it to bytes comparison for given serialization by default in
>>> off-heap
>>>>> state backends. Let users provide their own specific serializer if
>> they
>>>>> want to sort some customized type on RocksDB.
>>>>>
>>>>> Look forward to having some discussions about this feature. Please
>>> share
>>>>> your ideas if anyone has context on this. Thanks!
>>>>>
>>>>> Best,
>>>>> Xinghan
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6219
>>>>> [2]
>>>>>
>>>>>
>>>>
>>>
>> https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java
>>>>>
>>>>
>>>
>>
> 


Re: [DISCUSS] Support KeyedSortedMapState in DataStream API

Posted by Sean Z <xi...@gmail.com>.
Hi Jark,

Thanks for the reply and sharing thoughts.
Yes, negative long will make things complicated. We had the exact same
issue when implementing our own sortedMapState.
This could be solved by some special pre-defined serializers, and it looks
like that's what blink did [1] as you mentioned.

Blink implementation could be a good starting point and it would be better
to extend to support random key types as long as users provide their own
serializer, which is what I mentioned in option 2. I'm more in favor of
this approach too. But the support of random key types seems the main
blocker that prevents this Blink feature being merged into Flink, from what
I understand.

In my opinion, limiting the key type might be easier to implement and it is
actually not a bad idea to start with lite/consistent/fixed features. but
yeah I'm not sure what the Temporal State is trying to solve here for I
don't have too much context on it.


[1]
https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered/OrderedBytes.java


Best,
Xinghan

On Wed, Oct 14, 2020 at 7:19 PM Jark Wu <im...@gmail.com> wrote:

> Hi,
>
> Thanks for bringing this discussion.
>
> I think limiting the key type to Long can't resolve the comparison problem,
> because the bytes order and value order of negative numbers is different.
> Unless, we limit the key type to positive Long. But how to check this
> before submitting a job?
>
> In Blink code, we keep different sorting behavior in different
> statebackends.
> We also supported sorted map state for various key types (almost all the
> atomic types).
> The idea is serializing the given type value into an ordered bytes, see
> more:
>
>
> https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered
>
> Best,
> Jark
>
> On Thu, 15 Oct 2020 at 06:46, Sean Z <xi...@gmail.com> wrote:
>
> > Thanks for the reply! Look forward to learning more about this prototype.
> > Is there any way that we could track this TemporalState like Jira issue?
> or
> > should we start to create one in Jira? so anyone has interest like me,
> > could be part of the loop. Besides, is there any written docs/code about
> > the prototype so we could have more context about that?
> >
> > Best,
> > Xinghan
> >
> >
> > On Wed, Oct 14, 2020 at 11:09 AM David Anderson <da...@alpinegizmo.com>
> > wrote:
> >
> > > I'm very interested in this topic, and have even done some prototyping
> of
> > > solution 1 -- limiting the key type to Long -- which Nico Kruber and I
> > > called TemporalState in our prototype.
> > >
> > > I look forward to sharing what we learned, and to discussing this
> > further,
> > > but I am completely overwhelmed with Flink Forward preparations at the
> > > moment.
> > >
> > > Best,
> > > David
> > >
> > > On Wed, Oct 14, 2020 at 9:30 AM Sean Z <xi...@gmail.com> wrote:
> > >
> > > > Hi devs,
> > > >
> > > > Current DataStream API doesn't have SortedMapState supported. There
> are
> > > > lots of use cases based on sorted time-series data like range-query
> or
> > > > higher/lower key fetch, and ordered data seems like a nature of
> > > time-series
> > > > stream processing. Therefore, we propose to support the
> > > KeyedSortedMapState
> > > > feature.
> > > >
> > > > There were some previous discussions [1] about SortedMapState, and
> the
> > > > thread was closed because blink code might cover this feature.
> However,
> > > the
> > > > blink code[2] wasn't merged into the master branch since then. The
> > major
> > > > concern is the inconsistent comparison between heap/off-heap state
> > > > backends. In RocksDB, the comparison should be based on bytes, which
> > > makes
> > > > generic key types support challenging, and in heap state backend, the
> > > > comparison is more about Comparable interface.
> > > >
> > > > There are two possible solutions to this issue in my opinion,
> > > > 1. We could limit the key type to Long type, for most of the use
> cases
> > > are
> > > > about timestamp as a key. It's easier to implement but brings
> > limitations
> > > > to support generic key types.
> > > > 2. We keep the different sorting behavior of different state backends
> > and
> > > > set it to bytes comparison for given serialization by default in
> > off-heap
> > > > state backends. Let users provide their own specific serializer if
> they
> > > > want to sort some customized type on RocksDB.
> > > >
> > > > Look forward to having some discussions about this feature. Please
> > share
> > > > your ideas if anyone has context on this. Thanks!
> > > >
> > > > Best,
> > > > Xinghan
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-6219
> > > > [2]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java
> > > >
> > >
> >
>

Re: [DISCUSS] Support KeyedSortedMapState in DataStream API

Posted by Jark Wu <im...@gmail.com>.
Hi,

Thanks for bringing this discussion.

I think limiting the key type to Long can't resolve the comparison problem,
because the bytes order and value order of negative numbers is different.
Unless, we limit the key type to positive Long. But how to check this
before submitting a job?

In Blink code, we keep different sorting behavior in different
statebackends.
We also supported sorted map state for various key types (almost all the
atomic types).
The idea is serializing the given type value into an ordered bytes, see
more:

https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered

Best,
Jark

On Thu, 15 Oct 2020 at 06:46, Sean Z <xi...@gmail.com> wrote:

> Thanks for the reply! Look forward to learning more about this prototype.
> Is there any way that we could track this TemporalState like Jira issue? or
> should we start to create one in Jira? so anyone has interest like me,
> could be part of the loop. Besides, is there any written docs/code about
> the prototype so we could have more context about that?
>
> Best,
> Xinghan
>
>
> On Wed, Oct 14, 2020 at 11:09 AM David Anderson <da...@alpinegizmo.com>
> wrote:
>
> > I'm very interested in this topic, and have even done some prototyping of
> > solution 1 -- limiting the key type to Long -- which Nico Kruber and I
> > called TemporalState in our prototype.
> >
> > I look forward to sharing what we learned, and to discussing this
> further,
> > but I am completely overwhelmed with Flink Forward preparations at the
> > moment.
> >
> > Best,
> > David
> >
> > On Wed, Oct 14, 2020 at 9:30 AM Sean Z <xi...@gmail.com> wrote:
> >
> > > Hi devs,
> > >
> > > Current DataStream API doesn't have SortedMapState supported. There are
> > > lots of use cases based on sorted time-series data like range-query or
> > > higher/lower key fetch, and ordered data seems like a nature of
> > time-series
> > > stream processing. Therefore, we propose to support the
> > KeyedSortedMapState
> > > feature.
> > >
> > > There were some previous discussions [1] about SortedMapState, and the
> > > thread was closed because blink code might cover this feature. However,
> > the
> > > blink code[2] wasn't merged into the master branch since then. The
> major
> > > concern is the inconsistent comparison between heap/off-heap state
> > > backends. In RocksDB, the comparison should be based on bytes, which
> > makes
> > > generic key types support challenging, and in heap state backend, the
> > > comparison is more about Comparable interface.
> > >
> > > There are two possible solutions to this issue in my opinion,
> > > 1. We could limit the key type to Long type, for most of the use cases
> > are
> > > about timestamp as a key. It's easier to implement but brings
> limitations
> > > to support generic key types.
> > > 2. We keep the different sorting behavior of different state backends
> and
> > > set it to bytes comparison for given serialization by default in
> off-heap
> > > state backends. Let users provide their own specific serializer if they
> > > want to sort some customized type on RocksDB.
> > >
> > > Look forward to having some discussions about this feature. Please
> share
> > > your ideas if anyone has context on this. Thanks!
> > >
> > > Best,
> > > Xinghan
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-6219
> > > [2]
> > >
> > >
> >
> https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java
> > >
> >
>

Re: [DISCUSS] Support KeyedSortedMapState in DataStream API

Posted by Sean Z <xi...@gmail.com>.
Thanks for the reply! Look forward to learning more about this prototype.
Is there any way that we could track this TemporalState like Jira issue? or
should we start to create one in Jira? so anyone has interest like me,
could be part of the loop. Besides, is there any written docs/code about
the prototype so we could have more context about that?

Best,
Xinghan


On Wed, Oct 14, 2020 at 11:09 AM David Anderson <da...@alpinegizmo.com>
wrote:

> I'm very interested in this topic, and have even done some prototyping of
> solution 1 -- limiting the key type to Long -- which Nico Kruber and I
> called TemporalState in our prototype.
>
> I look forward to sharing what we learned, and to discussing this further,
> but I am completely overwhelmed with Flink Forward preparations at the
> moment.
>
> Best,
> David
>
> On Wed, Oct 14, 2020 at 9:30 AM Sean Z <xi...@gmail.com> wrote:
>
> > Hi devs,
> >
> > Current DataStream API doesn't have SortedMapState supported. There are
> > lots of use cases based on sorted time-series data like range-query or
> > higher/lower key fetch, and ordered data seems like a nature of
> time-series
> > stream processing. Therefore, we propose to support the
> KeyedSortedMapState
> > feature.
> >
> > There were some previous discussions [1] about SortedMapState, and the
> > thread was closed because blink code might cover this feature. However,
> the
> > blink code[2] wasn't merged into the master branch since then. The major
> > concern is the inconsistent comparison between heap/off-heap state
> > backends. In RocksDB, the comparison should be based on bytes, which
> makes
> > generic key types support challenging, and in heap state backend, the
> > comparison is more about Comparable interface.
> >
> > There are two possible solutions to this issue in my opinion,
> > 1. We could limit the key type to Long type, for most of the use cases
> are
> > about timestamp as a key. It's easier to implement but brings limitations
> > to support generic key types.
> > 2. We keep the different sorting behavior of different state backends and
> > set it to bytes comparison for given serialization by default in off-heap
> > state backends. Let users provide their own specific serializer if they
> > want to sort some customized type on RocksDB.
> >
> > Look forward to having some discussions about this feature. Please share
> > your ideas if anyone has context on this. Thanks!
> >
> > Best,
> > Xinghan
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-6219
> > [2]
> >
> >
> https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java
> >
>

Re: [DISCUSS] Support KeyedSortedMapState in DataStream API

Posted by David Anderson <da...@alpinegizmo.com>.
I'm very interested in this topic, and have even done some prototyping of
solution 1 -- limiting the key type to Long -- which Nico Kruber and I
called TemporalState in our prototype.

I look forward to sharing what we learned, and to discussing this further,
but I am completely overwhelmed with Flink Forward preparations at the
moment.

Best,
David

On Wed, Oct 14, 2020 at 9:30 AM Sean Z <xi...@gmail.com> wrote:

> Hi devs,
>
> Current DataStream API doesn't have SortedMapState supported. There are
> lots of use cases based on sorted time-series data like range-query or
> higher/lower key fetch, and ordered data seems like a nature of time-series
> stream processing. Therefore, we propose to support the KeyedSortedMapState
> feature.
>
> There were some previous discussions [1] about SortedMapState, and the
> thread was closed because blink code might cover this feature. However, the
> blink code[2] wasn't merged into the master branch since then. The major
> concern is the inconsistent comparison between heap/off-heap state
> backends. In RocksDB, the comparison should be based on bytes, which makes
> generic key types support challenging, and in heap state backend, the
> comparison is more about Comparable interface.
>
> There are two possible solutions to this issue in my opinion,
> 1. We could limit the key type to Long type, for most of the use cases are
> about timestamp as a key. It's easier to implement but brings limitations
> to support generic key types.
> 2. We keep the different sorting behavior of different state backends and
> set it to bytes comparison for given serialization by default in off-heap
> state backends. Let users provide their own specific serializer if they
> want to sort some customized type on RocksDB.
>
> Look forward to having some discussions about this feature. Please share
> your ideas if anyone has context on this. Thanks!
>
> Best,
> Xinghan
>
> [1] https://issues.apache.org/jira/browse/FLINK-6219
> [2]
>
> https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java
>