You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by SHI Xiaogang <sh...@gmail.com> on 2016/10/19 02:18:34 UTC

Add MapState for keyed streams

Hi, all. I created the JIRA https://issues.apache.org/jira/browse/FLINK-4856 to
propose adding MapStates into Flink.

MapStates are very useful in our daily jobs. For example, when implementing
DistinctCount, we store the values into a MapState and the result of each
group(key) is exactly the number of entries in the MapState.

In my opinion, the methods provided by the MapState may include:
* void put(K key, V value)
* V get(K key)
* Iterable<K> keys()
* Iterable<V> values()
* Iterator<Tuple2<K, V>> iterator()

Do you have any comments? Any is appreciated.

Xiaogang

Re: Add MapState for keyed streams

Posted by SHI Xiaogang <sh...@gmail.com>.
The keys in the MapState are different from the keys in the KeyedStream.

There are two types of keys: Stream Keys and User Keys.

StreamKeys are those created by the keyBy() operation.

As well as other states, MapStates are created under StreamKeys.
The keys of the MapState are called UserKeys. UserKeys are not necessarily
as the same as the StreamKeys.

Regards
Xiaogang

2016-10-19 17:28 GMT+08:00 Aljoscha Krettek <al...@apache.org>:

> Hi,
> just making sure I understand this correctly. Would the MapState keys be
> the same keys as the one provided when creating the KeyedStream or a
> different key.
>
> As an example, would it be like this:
>
> DataStream<Tuple2<K, V>> input = ...;
> KeyedStream keyed = input.keyBy(0)
> keyed.map( Tuple2<K, V> input -> mapState.put(input.f0, input.f1) )
>
> or like this:
>
> DataStream<Tuple3<K, V1, V2>> input = ...;
> KeyedStream keyed = input.keyBy(0)
> keyed.map( Tuple3<K, V1, V2> input -> mapState.put(input.f1, input.f2) ) //
> <- notice how the key for the MapState is different from the key of the
> KeyedStream
>
> Cheers,
> Aljoscha
>
>
> On Wed, 19 Oct 2016 at 10:55 Till Rohrmann <tr...@apache.org> wrote:
>
> Hi Xiaogang,
>
> I really like your proposal and think that this would be a valuable
> addition to Flink :-)
>
> For convenience we could maybe add contains(K key), too.
>
> Java's Map interface returns a Set of Entry when calling entrySet() (which
> is the equivalent of iterator() in our interface). The Entry interface not
> only allows to get access to the key and value of the map entry but also
> allows to set a value for the respective key via setValue (even though it's
> an optional operation). Maybe we want to offer something similar when
> getting access to the entry set via the iterator method.
>
> Cheers,
> Till
>
> On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang <sh...@gmail.com>
> wrote:
>
> > Hi, all. I created the JIRA https://issues.apache.org/
> > jira/browse/FLINK-4856 to
> > propose adding MapStates into Flink.
> >
> > MapStates are very useful in our daily jobs. For example, when
> implementing
> > DistinctCount, we store the values into a MapState and the result of each
> > group(key) is exactly the number of entries in the MapState.
> >
> > In my opinion, the methods provided by the MapState may include:
> > * void put(K key, V value)
> > * V get(K key)
> > * Iterable<K> keys()
> > * Iterable<V> values()
> > * Iterator<Tuple2<K, V>> iterator()
> >
> > Do you have any comments? Any is appreciated.
> >
> > Xiaogang
> >
>

Re: Add MapState for keyed streams

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
just making sure I understand this correctly. Would the MapState keys be
the same keys as the one provided when creating the KeyedStream or a
different key.

As an example, would it be like this:

DataStream<Tuple2<K, V>> input = ...;
KeyedStream keyed = input.keyBy(0)
keyed.map( Tuple2<K, V> input -> mapState.put(input.f0, input.f1) )

or like this:

DataStream<Tuple3<K, V1, V2>> input = ...;
KeyedStream keyed = input.keyBy(0)
keyed.map( Tuple3<K, V1, V2> input -> mapState.put(input.f1, input.f2) ) //
<- notice how the key for the MapState is different from the key of the
KeyedStream

Cheers,
Aljoscha


On Wed, 19 Oct 2016 at 10:55 Till Rohrmann <tr...@apache.org> wrote:

Hi Xiaogang,

I really like your proposal and think that this would be a valuable
addition to Flink :-)

For convenience we could maybe add contains(K key), too.

Java's Map interface returns a Set of Entry when calling entrySet() (which
is the equivalent of iterator() in our interface). The Entry interface not
only allows to get access to the key and value of the map entry but also
allows to set a value for the respective key via setValue (even though it's
an optional operation). Maybe we want to offer something similar when
getting access to the entry set via the iterator method.

Cheers,
Till

On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang <sh...@gmail.com>
wrote:

> Hi, all. I created the JIRA https://issues.apache.org/
> jira/browse/FLINK-4856 to
> propose adding MapStates into Flink.
>
> MapStates are very useful in our daily jobs. For example, when
implementing
> DistinctCount, we store the values into a MapState and the result of each
> group(key) is exactly the number of entries in the MapState.
>
> In my opinion, the methods provided by the MapState may include:
> * void put(K key, V value)
> * V get(K key)
> * Iterable<K> keys()
> * Iterable<V> values()
> * Iterator<Tuple2<K, V>> iterator()
>
> Do you have any comments? Any is appreciated.
>
> Xiaogang
>

Re: Add MapState for keyed streams

Posted by Jark Wu <wu...@alibaba-inc.com>.
That makes sense!  Maybe we can make MapState implement Iterable interface, so that we can use foreach directly on MapState.

- Jark Wu 

> 在 2016年10月19日,下午5:48,SHI Xiaogang <sh...@gmail.com> 写道:
> 
> Hi Jark
> 
> If the state is very big, it may occupy a lot of memory if we return
> Set<Map.Entry<K, V>>.
> 
> By wrapping the returned iterator, we can easily implement a method
> returning  Iterable<Map.Entry<K, V>>.
> Users can use that returned Iterable in the foreach loop.
> 
> Regards
> Xiaogang
> 
> 
> 
> 2016-10-19 17:43 GMT+08:00 Jark Wu <wu...@alibaba-inc.com>:
> 
>> Hi Xiaogang,
>> 
>> I think maybe return Set<Map.Entry<K,V>> is better than
>> Iterator<Map.Entry<K,V>>.
>> Because users can use foreach on Set but not Iterator, and can use
>> iterator access via set.iterator().
>> Maybe Map.entrySet() is a more familiar way to users.
>> 
>> 
>> - Jark Wu
>> 
>>> 在 2016年10月19日,下午5:18,SHI Xiaogang <sh...@gmail.com> 写道:
>>> 
>>> Agreed.
>>> 
>>> contains(K key) should be provided.
>>> The iterator() method should return Iterator<Map.Entry<K, V>> instead of
>>> Iterator<Tuple2<K, V>>.
>>> 
>>> Besides, size() may also be provided.
>>> 
>>> With these methods, MapStates appear very similar to Java Maps. Users
>> will
>>> be very happy to use them.
>>> 
>>> Regards,
>>> Xiaogang
>>> 
>>> 
>>> 2016-10-19 16:55 GMT+08:00 Till Rohrmann <tr...@apache.org>:
>>> 
>>>> Hi Xiaogang,
>>>> 
>>>> I really like your proposal and think that this would be a valuable
>>>> addition to Flink :-)
>>>> 
>>>> For convenience we could maybe add contains(K key), too.
>>>> 
>>>> Java's Map interface returns a Set of Entry when calling entrySet()
>> (which
>>>> is the equivalent of iterator() in our interface). The Entry interface
>> not
>>>> only allows to get access to the key and value of the map entry but also
>>>> allows to set a value for the respective key via setValue (even though
>> it's
>>>> an optional operation). Maybe we want to offer something similar when
>>>> getting access to the entry set via the iterator method.
>>>> 
>>>> Cheers,
>>>> Till
>>>> 
>>>> On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang <sh...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi, all. I created the JIRA https://issues.apache.org/
>>>>> jira/browse/FLINK-4856 to
>>>>> propose adding MapStates into Flink.
>>>>> 
>>>>> MapStates are very useful in our daily jobs. For example, when
>>>> implementing
>>>>> DistinctCount, we store the values into a MapState and the result of
>> each
>>>>> group(key) is exactly the number of entries in the MapState.
>>>>> 
>>>>> In my opinion, the methods provided by the MapState may include:
>>>>> * void put(K key, V value)
>>>>> * V get(K key)
>>>>> * Iterable<K> keys()
>>>>> * Iterable<V> values()
>>>>> * Iterator<Tuple2<K, V>> iterator()
>>>>> 
>>>>> Do you have any comments? Any is appreciated.
>>>>> 
>>>>> Xiaogang
>>>>> 
>>>> 
>> 
>> 


Re: Add MapState for keyed streams

Posted by Jark <im...@gmail.com>.
That makes sense! Maybe we can make MapState implement Iterable interface.

> 在 2016年10月19日,下午5:48,SHI Xiaogang <sh...@gmail.com> 写道:
> 
> Hi Jark
> 
> If the state is very big, it may occupy a lot of memory if we return
> Set<Map.Entry<K, V>>.
> 
> By wrapping the returned iterator, we can easily implement a method
> returning  Iterable<Map.Entry<K, V>>.
> Users can use that returned Iterable in the foreach loop.
> 
> Regards
> Xiaogang
> 
> 
> 
> 2016-10-19 17:43 GMT+08:00 Jark Wu <wu...@alibaba-inc.com>:
> 
>> Hi Xiaogang,
>> 
>> I think maybe return Set<Map.Entry<K,V>> is better than
>> Iterator<Map.Entry<K,V>>.
>> Because users can use foreach on Set but not Iterator, and can use
>> iterator access via set.iterator().
>> Maybe Map.entrySet() is a more familiar way to users.
>> 
>> 
>> - Jark Wu
>> 
>>> 在 2016年10月19日,下午5:18,SHI Xiaogang <sh...@gmail.com> 写道:
>>> 
>>> Agreed.
>>> 
>>> contains(K key) should be provided.
>>> The iterator() method should return Iterator<Map.Entry<K, V>> instead of
>>> Iterator<Tuple2<K, V>>.
>>> 
>>> Besides, size() may also be provided.
>>> 
>>> With these methods, MapStates appear very similar to Java Maps. Users
>> will
>>> be very happy to use them.
>>> 
>>> Regards,
>>> Xiaogang
>>> 
>>> 
>>> 2016-10-19 16:55 GMT+08:00 Till Rohrmann <tr...@apache.org>:
>>> 
>>>> Hi Xiaogang,
>>>> 
>>>> I really like your proposal and think that this would be a valuable
>>>> addition to Flink :-)
>>>> 
>>>> For convenience we could maybe add contains(K key), too.
>>>> 
>>>> Java's Map interface returns a Set of Entry when calling entrySet()
>> (which
>>>> is the equivalent of iterator() in our interface). The Entry interface
>> not
>>>> only allows to get access to the key and value of the map entry but also
>>>> allows to set a value for the respective key via setValue (even though
>> it's
>>>> an optional operation). Maybe we want to offer something similar when
>>>> getting access to the entry set via the iterator method.
>>>> 
>>>> Cheers,
>>>> Till
>>>> 
>>>> On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang <sh...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi, all. I created the JIRA https://issues.apache.org/
>>>>> jira/browse/FLINK-4856 to
>>>>> propose adding MapStates into Flink.
>>>>> 
>>>>> MapStates are very useful in our daily jobs. For example, when
>>>> implementing
>>>>> DistinctCount, we store the values into a MapState and the result of
>> each
>>>>> group(key) is exactly the number of entries in the MapState.
>>>>> 
>>>>> In my opinion, the methods provided by the MapState may include:
>>>>> * void put(K key, V value)
>>>>> * V get(K key)
>>>>> * Iterable<K> keys()
>>>>> * Iterable<V> values()
>>>>> * Iterator<Tuple2<K, V>> iterator()
>>>>> 
>>>>> Do you have any comments? Any is appreciated.
>>>>> 
>>>>> Xiaogang
>>>>> 
>>>> 
>> 
>> 


Re: Add MapState for keyed streams

Posted by SHI Xiaogang <sh...@gmail.com>.
Hi Jark

If the state is very big, it may occupy a lot of memory if we return
Set<Map.Entry<K, V>>.

By wrapping the returned iterator, we can easily implement a method
returning  Iterable<Map.Entry<K, V>>.
Users can use that returned Iterable in the foreach loop.

Regards
Xiaogang



2016-10-19 17:43 GMT+08:00 Jark Wu <wu...@alibaba-inc.com>:

> Hi Xiaogang,
>
> I think maybe return Set<Map.Entry<K,V>> is better than
> Iterator<Map.Entry<K,V>>.
> Because users can use foreach on Set but not Iterator, and can use
> iterator access via set.iterator().
> Maybe Map.entrySet() is a more familiar way to users.
>
>
> - Jark Wu
>
> > 在 2016年10月19日,下午5:18,SHI Xiaogang <sh...@gmail.com> 写道:
> >
> > Agreed.
> >
> > contains(K key) should be provided.
> > The iterator() method should return Iterator<Map.Entry<K, V>> instead of
> > Iterator<Tuple2<K, V>>.
> >
> > Besides, size() may also be provided.
> >
> > With these methods, MapStates appear very similar to Java Maps. Users
> will
> > be very happy to use them.
> >
> > Regards,
> > Xiaogang
> >
> >
> > 2016-10-19 16:55 GMT+08:00 Till Rohrmann <tr...@apache.org>:
> >
> >> Hi Xiaogang,
> >>
> >> I really like your proposal and think that this would be a valuable
> >> addition to Flink :-)
> >>
> >> For convenience we could maybe add contains(K key), too.
> >>
> >> Java's Map interface returns a Set of Entry when calling entrySet()
> (which
> >> is the equivalent of iterator() in our interface). The Entry interface
> not
> >> only allows to get access to the key and value of the map entry but also
> >> allows to set a value for the respective key via setValue (even though
> it's
> >> an optional operation). Maybe we want to offer something similar when
> >> getting access to the entry set via the iterator method.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang <sh...@gmail.com>
> >> wrote:
> >>
> >>> Hi, all. I created the JIRA https://issues.apache.org/
> >>> jira/browse/FLINK-4856 to
> >>> propose adding MapStates into Flink.
> >>>
> >>> MapStates are very useful in our daily jobs. For example, when
> >> implementing
> >>> DistinctCount, we store the values into a MapState and the result of
> each
> >>> group(key) is exactly the number of entries in the MapState.
> >>>
> >>> In my opinion, the methods provided by the MapState may include:
> >>> * void put(K key, V value)
> >>> * V get(K key)
> >>> * Iterable<K> keys()
> >>> * Iterable<V> values()
> >>> * Iterator<Tuple2<K, V>> iterator()
> >>>
> >>> Do you have any comments? Any is appreciated.
> >>>
> >>> Xiaogang
> >>>
> >>
>
>

Re: Add MapState for keyed streams

Posted by Aljoscha Krettek <al...@apache.org>.
Perfect! Then it's pretty much what we discussed here:
https://issues.apache.org/jira/browse/FLINK-3947 and I'm very much in
favour of that. Just the implementation of RocksDB could be a bit tricky
but it should be doable.

Cheers,
Aljoscha

On Wed, 19 Oct 2016 at 11:43 Jark Wu <wu...@alibaba-inc.com> wrote:

> Hi Xiaogang,
>
> I think maybe return Set<Map.Entry<K,V>> is better than
> Iterator<Map.Entry<K,V>>.
> Because users can use foreach on Set but not Iterator, and can use
> iterator access via set.iterator().
> Maybe Map.entrySet() is a more familiar way to users.
>
>
> - Jark Wu
>
> > 在 2016年10月19日,下午5:18,SHI Xiaogang <sh...@gmail.com> 写道:
> >
> > Agreed.
> >
> > contains(K key) should be provided.
> > The iterator() method should return Iterator<Map.Entry<K, V>> instead of
> > Iterator<Tuple2<K, V>>.
> >
> > Besides, size() may also be provided.
> >
> > With these methods, MapStates appear very similar to Java Maps. Users
> will
> > be very happy to use them.
> >
> > Regards,
> > Xiaogang
> >
> >
> > 2016-10-19 16:55 GMT+08:00 Till Rohrmann <tr...@apache.org>:
> >
> >> Hi Xiaogang,
> >>
> >> I really like your proposal and think that this would be a valuable
> >> addition to Flink :-)
> >>
> >> For convenience we could maybe add contains(K key), too.
> >>
> >> Java's Map interface returns a Set of Entry when calling entrySet()
> (which
> >> is the equivalent of iterator() in our interface). The Entry interface
> not
> >> only allows to get access to the key and value of the map entry but also
> >> allows to set a value for the respective key via setValue (even though
> it's
> >> an optional operation). Maybe we want to offer something similar when
> >> getting access to the entry set via the iterator method.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang <sh...@gmail.com>
> >> wrote:
> >>
> >>> Hi, all. I created the JIRA https://issues.apache.org/
> >>> jira/browse/FLINK-4856 to
> >>> propose adding MapStates into Flink.
> >>>
> >>> MapStates are very useful in our daily jobs. For example, when
> >> implementing
> >>> DistinctCount, we store the values into a MapState and the result of
> each
> >>> group(key) is exactly the number of entries in the MapState.
> >>>
> >>> In my opinion, the methods provided by the MapState may include:
> >>> * void put(K key, V value)
> >>> * V get(K key)
> >>> * Iterable<K> keys()
> >>> * Iterable<V> values()
> >>> * Iterator<Tuple2<K, V>> iterator()
> >>>
> >>> Do you have any comments? Any is appreciated.
> >>>
> >>> Xiaogang
> >>>
> >>
>
>

Re: Add MapState for keyed streams

Posted by Jark Wu <wu...@alibaba-inc.com>.
Hi Xiaogang,

I think maybe return Set<Map.Entry<K,V>> is better than Iterator<Map.Entry<K,V>>. 
Because users can use foreach on Set but not Iterator, and can use iterator access via set.iterator(). 
Maybe Map.entrySet() is a more familiar way to users.


- Jark Wu 

> 在 2016年10月19日,下午5:18,SHI Xiaogang <sh...@gmail.com> 写道:
> 
> Agreed.
> 
> contains(K key) should be provided.
> The iterator() method should return Iterator<Map.Entry<K, V>> instead of
> Iterator<Tuple2<K, V>>.
> 
> Besides, size() may also be provided.
> 
> With these methods, MapStates appear very similar to Java Maps. Users will
> be very happy to use them.
> 
> Regards,
> Xiaogang
> 
> 
> 2016-10-19 16:55 GMT+08:00 Till Rohrmann <tr...@apache.org>:
> 
>> Hi Xiaogang,
>> 
>> I really like your proposal and think that this would be a valuable
>> addition to Flink :-)
>> 
>> For convenience we could maybe add contains(K key), too.
>> 
>> Java's Map interface returns a Set of Entry when calling entrySet() (which
>> is the equivalent of iterator() in our interface). The Entry interface not
>> only allows to get access to the key and value of the map entry but also
>> allows to set a value for the respective key via setValue (even though it's
>> an optional operation). Maybe we want to offer something similar when
>> getting access to the entry set via the iterator method.
>> 
>> Cheers,
>> Till
>> 
>> On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang <sh...@gmail.com>
>> wrote:
>> 
>>> Hi, all. I created the JIRA https://issues.apache.org/
>>> jira/browse/FLINK-4856 to
>>> propose adding MapStates into Flink.
>>> 
>>> MapStates are very useful in our daily jobs. For example, when
>> implementing
>>> DistinctCount, we store the values into a MapState and the result of each
>>> group(key) is exactly the number of entries in the MapState.
>>> 
>>> In my opinion, the methods provided by the MapState may include:
>>> * void put(K key, V value)
>>> * V get(K key)
>>> * Iterable<K> keys()
>>> * Iterable<V> values()
>>> * Iterator<Tuple2<K, V>> iterator()
>>> 
>>> Do you have any comments? Any is appreciated.
>>> 
>>> Xiaogang
>>> 
>> 


Re: Add MapState for keyed streams

Posted by SHI Xiaogang <sh...@gmail.com>.
Agreed.

contains(K key) should be provided.
The iterator() method should return Iterator<Map.Entry<K, V>> instead of
Iterator<Tuple2<K, V>>.

Besides, size() may also be provided.

With these methods, MapStates appear very similar to Java Maps. Users will
be very happy to use them.

Regards,
Xiaogang


2016-10-19 16:55 GMT+08:00 Till Rohrmann <tr...@apache.org>:

> Hi Xiaogang,
>
> I really like your proposal and think that this would be a valuable
> addition to Flink :-)
>
> For convenience we could maybe add contains(K key), too.
>
> Java's Map interface returns a Set of Entry when calling entrySet() (which
> is the equivalent of iterator() in our interface). The Entry interface not
> only allows to get access to the key and value of the map entry but also
> allows to set a value for the respective key via setValue (even though it's
> an optional operation). Maybe we want to offer something similar when
> getting access to the entry set via the iterator method.
>
> Cheers,
> Till
>
> On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang <sh...@gmail.com>
> wrote:
>
> > Hi, all. I created the JIRA https://issues.apache.org/
> > jira/browse/FLINK-4856 to
> > propose adding MapStates into Flink.
> >
> > MapStates are very useful in our daily jobs. For example, when
> implementing
> > DistinctCount, we store the values into a MapState and the result of each
> > group(key) is exactly the number of entries in the MapState.
> >
> > In my opinion, the methods provided by the MapState may include:
> > * void put(K key, V value)
> > * V get(K key)
> > * Iterable<K> keys()
> > * Iterable<V> values()
> > * Iterator<Tuple2<K, V>> iterator()
> >
> > Do you have any comments? Any is appreciated.
> >
> > Xiaogang
> >
>

Re: Add MapState for keyed streams

Posted by Till Rohrmann <tr...@apache.org>.
Hi Xiaogang,

I really like your proposal and think that this would be a valuable
addition to Flink :-)

For convenience we could maybe add contains(K key), too.

Java's Map interface returns a Set of Entry when calling entrySet() (which
is the equivalent of iterator() in our interface). The Entry interface not
only allows to get access to the key and value of the map entry but also
allows to set a value for the respective key via setValue (even though it's
an optional operation). Maybe we want to offer something similar when
getting access to the entry set via the iterator method.

Cheers,
Till

On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang <sh...@gmail.com>
wrote:

> Hi, all. I created the JIRA https://issues.apache.org/
> jira/browse/FLINK-4856 to
> propose adding MapStates into Flink.
>
> MapStates are very useful in our daily jobs. For example, when implementing
> DistinctCount, we store the values into a MapState and the result of each
> group(key) is exactly the number of entries in the MapState.
>
> In my opinion, the methods provided by the MapState may include:
> * void put(K key, V value)
> * V get(K key)
> * Iterable<K> keys()
> * Iterable<V> values()
> * Iterator<Tuple2<K, V>> iterator()
>
> Do you have any comments? Any is appreciated.
>
> Xiaogang
>