You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kiley Sok <ki...@google.com> on 2021/09/08 01:15:00 UTC

Re: MapState API

This section of the comment [1] says that calling read() is required for
the condition to be evaluated.

Example in the comment is:
ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
mapState.remove(keyA);
maybePut.read(); // writes to map

 Also, it states that it should be a deferred read-followed-by-write.
Deferred until read is called?

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64

On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:

> I believe that all our ReadableState calls are point in time snapshots of
> what the underlying state is plus any changes that happened within the
> bundle already (e.g. you can see your own writes/appends/clears assuming
> that the creation of the ReadableState happened after the mutation).
>
> Here are some example of what I think should happen:
>
> #1
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
> ReadableState<T> read = mapState.get(keyA);   // value should equal
> "valueA"
>
> #2
> ReadableState<T> read = mapState.get(keyA);   // read should equal prior
> value
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>
> #3
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
> // any calls to MapState put/remove for keyA can effectively drop the
> write that would be pending via maybePut
> // maybePut should also be resolved before the write happens to ensure
> that it can return true/false truthfully if the state was empty
>
> #4
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
> // in a future bundle for same key and window
> ReadableState<T> read = mapState.get(keyA);   // value should equal
> "valueA"
>
> I think #1, #3, and #4 don't currently work and #2 doesn't work if the
> mapState didn't contain keyA
>
> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Can the change be observed in that case? I think the semantics should be
>> that the timing of the actual write cannot be observed.
>>
>> Kenn
>>
>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Yeah, the "or state is committed" means that we should resolve it before
>>> any additional writes but that only addresses part of the surprise I had.
>>>
>>> I would have expected that the putIfAbsent would force a resolution
>>> before transitioning to process the next key and window pair regardless
>>> whether the read() was invoked or there were additional writes.
>>>
>>>
>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org>
>>> wrote:
>>>
>>>> I think it is an implementation bug. The javadoc for putIfAbsent says
>>>> "When {@code read()} is called on the result or state is committed, it
>>>> forces a read of the map and reconciliation with any pending modifications."
>>>>
>>>> My reading of this is that the value changes for anything that happens
>>>> after the call to putIfAbsent. It would be good to make this clear, that
>>>> any subsequent observation of this cell should observe the new value.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent
>>>>> semantics were very weird. I almost would have preferred not having those
>>>>> methods in MapState, even though they can be useful.
>>>>>
>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I was surprised to see the MapState API for
>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>> that is returned is resolved.
>>>>>>
>>>>>> For example:
>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new value
>>>>>> maybe");
>>>>>> does nothing until
>>>>>> value.read();
>>>>>>
>>>>>> It would seem like an obvious mistake for developers to make to
>>>>>> forget to do the value.read() when coming from the Java Map API which
>>>>>> always performs the write.
>>>>>>
>>>>>

Re: MapState API

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <lc...@google.com> wrote:
>
> I believe we should consider these bugs and fix them since they are surprising behaviors for users based upon what people are used to from similar APIs in other Map constructs.

+1

> The benefit of the changelog approach is to allow for putIfAbsent to have a blind append if read is never called. To expand on my example of putIfAbsent:
> First Bundle (assuming no read in this bundle):
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  // Ignore this since we now know that the map has an entry
> Produces one blind append of (keyA, (PutIfAbsent, value1))
>
> Second Bundle:
> ReadableState<V> get = mapState.get(keyA);
> V value = get.read();
> The underlying values will be a list of changes applied in order to this map. The first read that sees multiple values should clear the list and resolve it similar to what a combining state does, returning the only Put (since Put = clear + append) in the list or the first PutIfAbsent.
>
> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <ki...@google.com> wrote:
>>
>> Would this be a breaking change then? Going by your first examples, it's no longer a deferred read-then-write.
>>
>> I'm not seeing the benefit of having a changelog. If I'm reading at your examples correctly, it's saying it should evaluate putIfAbsent immediately and store the result in ReadableState until read is called?
>>
>> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>> I agree with the comment that you linked to Kiley and your analysis of what happens today. Yes I believe the write will not happen until the read() is invoked on ReadableState<V> returned from the putIfAbsent(). The rest of the comment also says that if there are multiple putIfAbsent calls then which one is written is dependent on which ReadableState<V> read() is called leading to this behavior:
>>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>>> maybePut2.read(); // If keyA was absent, then it will have value2 otherwise it will maintain its old value.
>>> maybePut1.read(); // This is effectively ignored
>>>
>>> I started this thread because a lot of these behaviors are surprising and we should fix them to have ordering semantics based upon the actual order of interactions with the MapState and not the current semantics which rely on read() being invoked.
>>>
>>> Conveniently we don't have to resolve the read() immediately after a put/putIfAbsent and can still have a blind append on to a multimap state. We can treat the underlying value store as a changelog which we can figure out the true value when performing a future read if we store <Operation, Value> (where Operation is Put or PutIfAbsent) and can read from the start to the end looking for the last Put in the list and if none exists the first PutIfAbsent.
>>>
>>> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <ki...@google.com> wrote:
>>>>
>>>> As a side note, SEO needs some work. The first result for "mapstate beam" on Google is from version 2.2.0
>>>>
>>>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com> wrote:
>>>>>
>>>>> This section of the comment [1] says that calling read() is required for the condition to be evaluated.
>>>>>
>>>>> Example in the comment is:
>>>>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>> mapState.remove(keyA);
>>>>> maybePut.read(); // writes to map
>>>>>
>>>>>  Also, it states that it should be a deferred read-followed-by-write. Deferred until read is called?
>>>>>
>>>>> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>>>>
>>>>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>> I believe that all our ReadableState calls are point in time snapshots of what the underlying state is plus any changes that happened within the bundle already (e.g. you can see your own writes/appends/clears assuming that the creation of the ReadableState happened after the mutation).
>>>>>>
>>>>>> Here are some example of what I think should happen:
>>>>>>
>>>>>> #1
>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
>>>>>>
>>>>>> #2
>>>>>> ReadableState<T> read = mapState.get(keyA);   // read should equal prior value
>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>>
>>>>>> #3
>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>> // any calls to MapState put/remove for keyA can effectively drop the write that would be pending via maybePut
>>>>>> // maybePut should also be resolved before the write happens to ensure that it can return true/false truthfully if the state was empty
>>>>>>
>>>>>> #4
>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>>> // in a future bundle for same key and window
>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
>>>>>>
>>>>>> I think #1, #3, and #4 don't currently work and #2 doesn't work if the mapState didn't contain keyA
>>>>>>
>>>>>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>>
>>>>>>> Can the change be observed in that case? I think the semantics should be that the timing of the actual write cannot be observed.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>> Yeah, the "or state is committed" means that we should resolve it before any additional writes but that only addresses part of the surprise I had.
>>>>>>>>
>>>>>>>> I would have expected that the putIfAbsent would force a resolution before transitioning to process the next key and window pair regardless whether the read() was invoked or there were additional writes.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>> I think it is an implementation bug. The javadoc for putIfAbsent says "When {@code read()} is called on the result or state is committed, it forces a read of the map and reconciliation with any pending modifications."
>>>>>>>>>
>>>>>>>>> My reading of this is that the value changes for anything that happens after the call to putIfAbsent. It would be good to make this clear, that any subsequent observation of this cell should observe the new value.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent semantics were very weird. I almost would have preferred not having those methods in MapState, even though they can be useful.
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I was surprised to see the MapState API for computeIfAbsent/putIfAbsent only performs the write if the ReadableState that is returned is resolved.
>>>>>>>>>>>
>>>>>>>>>>> For example:
>>>>>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new value maybe");
>>>>>>>>>>> does nothing until
>>>>>>>>>>> value.read();
>>>>>>>>>>>
>>>>>>>>>>> It would seem like an obvious mistake for developers to make to forget to do the value.read() when coming from the Java Map API which always performs the write.

Re: MapState API

Posted by Ke Wu <ke...@gmail.com>.
Awesome, looking forward to it.

> On Sep 10, 2021, at 10:30 AM, Luke Cwik <lc...@google.com> wrote:
> 
> Yes, https://github.com/apache/beam/pull/15238 <https://github.com/apache/beam/pull/15238> is in progress for MapState and SetState.
> 
> On Fri, Sep 10, 2021 at 9:22 AM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
> Another question on this topic, is any work planned to add the map state support in portable mode [1], same for set state, list state?
> 
> [1] https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337 <https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337> 
> 
>> On Sep 9, 2021, at 9:31 AM, Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
>> 
>> The underlying state implementation that the FnApiDoFnRunner uses has a "close" method specifically meant to flush any pending operations so that part should be trivial. I don't think persisting the "changelog" would be difficult but migrating to using it in the future would cause issues for job update since we would be changing the encoding so deciding upfront which approach we want would be useful.
>> 
>> 
>> 
>> On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax <relax@google.com <ma...@google.com>> wrote:
>> I think a changelog could be kept in memory - persisting a changelog seems overly complex. We would have to make sure to flush the changelog at the end of every bundle though, so it would only help if the map key was accessed multiple times in the same bundle. I don't think it's correct that we won't be able to remove the key. In your example, all of the calls are in a single processElement or a single bundle (I'm not sure if it's legal to maintain ReadableState references between elements within the same bundle though). In this case the call to remove would also be an operation journaled into the log, so after flushing the whole log at the end of the bundle the key would be removed.
>> 
>> I don't think the above would be that hard, but IMO it's also fine to do something simpler initially.
>> 
>> Reuven
>> 
>> On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok <kileysok@google.com <ma...@google.com>> wrote:
>> Is adding the complexity of the changelog worth it instead of resolving immediately? We would be storing much more persisted data and wouldn't be able to clear the key even on a remove().
>> 
>> For example:
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>> mapState.remove(keyA); // still need to keep the changelog
>> V maybePut2Value = maybePut2.read(); // should return value1
>> 
>> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
>> I believe we should consider these bugs and fix them since they are surprising behaviors for users based upon what people are used to from similar APIs in other Map constructs.
>> 
>> The benefit of the changelog approach is to allow for putIfAbsent to have a blind append if read is never called. To expand on my example of putIfAbsent:
>> First Bundle (assuming no read in this bundle):
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  // Ignore this since we now know that the map has an entry
>> Produces one blind append of (keyA, (PutIfAbsent, value1))
>> 
>> Second Bundle:
>> ReadableState<V> get = mapState.get(keyA);
>> V value = get.read();
>> The underlying values will be a list of changes applied in order to this map. The first read that sees multiple values should clear the list and resolve it similar to what a combining state does, returning the only Put (since Put = clear + append) in the list or the first PutIfAbsent.
>> 
>> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <kileysok@google.com <ma...@google.com>> wrote:
>> Would this be a breaking change then? Going by your first examples, it's no longer a deferred read-then-write. 
>> 
>> I'm not seeing the benefit of having a changelog. If I'm reading at your examples correctly, it's saying it should evaluate putIfAbsent immediately and store the result in ReadableState until read is called?
>> 
>> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
>> I agree with the comment that you linked to Kiley and your analysis of what happens today. Yes I believe the write will not happen until the read() is invoked on ReadableState<V> returned from the putIfAbsent(). The rest of the comment also says that if there are multiple putIfAbsent calls then which one is written is dependent on which ReadableState<V> read() is called leading to this behavior:
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>> maybePut2.read(); // If keyA was absent, then it will have value2 otherwise it will maintain its old value.
>> maybePut1.read(); // This is effectively ignored
>> 
>> I started this thread because a lot of these behaviors are surprising and we should fix them to have ordering semantics based upon the actual order of interactions with the MapState and not the current semantics which rely on read() being invoked.
>> 
>> Conveniently we don't have to resolve the read() immediately after a put/putIfAbsent and can still have a blind append on to a multimap state. We can treat the underlying value store as a changelog which we can figure out the true value when performing a future read if we store <Operation, Value> (where Operation is Put or PutIfAbsent) and can read from the start to the end looking for the last Put in the list and if none exists the first PutIfAbsent.
>> 
>> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <kileysok@google.com <ma...@google.com>> wrote:
>> As a side note, SEO needs some work. The first result for "mapstate beam" on Google is from version 2.2.0
>> 
>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <kileysok@google.com <ma...@google.com>> wrote:
>> This section of the comment [1] says that calling read() is required for the condition to be evaluated.
>> 
>> Example in the comment is:
>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>> mapState.remove(keyA);
>> maybePut.read(); // writes to map
>> 
>>  Also, it states that it should be a deferred read-followed-by-write. Deferred until read is called?
>> 
>> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64 <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64>
>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
>> I believe that all our ReadableState calls are point in time snapshots of what the underlying state is plus any changes that happened within the bundle already (e.g. you can see your own writes/appends/clears assuming that the creation of the ReadableState happened after the mutation).
>> 
>> Here are some example of what I think should happen:
>> 
>> #1
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
>> 
>> #2
>> ReadableState<T> read = mapState.get(keyA);   // read should equal prior value
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>> 
>> #3
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>> // any calls to MapState put/remove for keyA can effectively drop the write that would be pending via maybePut
>> // maybePut should also be resolved before the write happens to ensure that it can return true/false truthfully if the state was empty
>> 
>> #4
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>> // in a future bundle for same key and window
>> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
>> 
>> I think #1, #3, and #4 don't currently work and #2 doesn't work if the mapState didn't contain keyA
>> 
>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <kenn@apache.org <ma...@apache.org>> wrote:
>> Can the change be observed in that case? I think the semantics should be that the timing of the actual write cannot be observed.
>> 
>> Kenn
>> 
>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
>> Yeah, the "or state is committed" means that we should resolve it before any additional writes but that only addresses part of the surprise I had.
>> 
>> I would have expected that the putIfAbsent would force a resolution before transitioning to process the next key and window pair regardless whether the read() was invoked or there were additional writes.
>> 
>> 
>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <kenn@apache.org <ma...@apache.org>> wrote:
>> I think it is an implementation bug. The javadoc for putIfAbsent says "When {@code read()} is called on the result or state is committed, it forces a read of the map and reconciliation with any pending modifications."
>> 
>> My reading of this is that the value changes for anything that happens after the call to putIfAbsent. It would be good to make this clear, that any subsequent observation of this cell should observe the new value.
>> 
>> Kenn
>> 
>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <relax@google.com <ma...@google.com>> wrote:
>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent semantics were very weird. I almost would have preferred not having those methods in MapState, even though they can be useful.
>> 
>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
>> I was surprised to see the MapState API for computeIfAbsent/putIfAbsent only performs the write if the ReadableState that is returned is resolved.
>> 
>> For example:
>> ReadableState<String> value = mapState.putIfAbsent("key", "new value maybe");
>> does nothing until
>> value.read();
>> 
>> It would seem like an obvious mistake for developers to make to forget to do the value.read() when coming from the Java Map API which always performs the write.
> 


Re: MapState API

Posted by Luke Cwik <lc...@google.com>.
Yes, https://github.com/apache/beam/pull/15238 is in progress for MapState
and SetState.

On Fri, Sep 10, 2021 at 9:22 AM Ke Wu <ke...@gmail.com> wrote:

> Another question on this topic, is any work planned to add the map state
> support in portable mode [1], same for set state, list state?
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337
>
>
> On Sep 9, 2021, at 9:31 AM, Luke Cwik <lc...@google.com> wrote:
>
> The underlying state implementation that the FnApiDoFnRunner uses has a
> "close" method specifically meant to flush any pending operations so that
> part should be trivial. I don't think persisting the "changelog" would be
> difficult but migrating to using it in the future would cause issues for
> job update since we would be changing the encoding so deciding upfront
> which approach we want would be useful.
>
>
>
> On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax <re...@google.com> wrote:
>
>> I think a changelog could be kept in memory - persisting a changelog
>> seems overly complex. We would have to make sure to flush the changelog at
>> the end of every bundle though, so it would only help if the map key was
>> accessed multiple times in the same bundle. I don't think it's correct that
>> we won't be able to remove the key. In your example, all of the calls are
>> in a single processElement or a single bundle (I'm not sure if it's legal
>> to maintain ReadableState references between elements within the same
>> bundle though). In this case the call to remove would also be an operation
>> journaled into the log, so after flushing the whole log at the end of the
>> bundle the key would be removed.
>>
>> I don't think the above would be that hard, but IMO it's also fine to do
>> something simpler initially.
>>
>> Reuven
>>
>> On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok <ki...@google.com> wrote:
>>
>>> Is adding the complexity of the changelog worth it instead of resolving
>>> immediately? We would be storing much more persisted data and wouldn't be
>>> able to clear the key even on a remove().
>>>
>>> For example:
>>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>>> mapState.remove(keyA); // still need to keep the changelog
>>> V maybePut2Value = maybePut2.read(); // should return value1
>>>
>>> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> I believe we should consider these bugs and fix them since they are
>>>> surprising behaviors for users based upon what people are used to from
>>>> similar APIs in other Map constructs.
>>>>
>>>> The benefit of the changelog approach is to allow for putIfAbsent to
>>>> have a blind append if read is never called. To expand on my example of
>>>> putIfAbsent:
>>>> First Bundle (assuming no read in this bundle):
>>>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>>>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  //
>>>> Ignore this since we now know that the map has an entry
>>>> Produces one blind append of (keyA, (PutIfAbsent, value1))
>>>>
>>>> Second Bundle:
>>>> ReadableState<V> get = mapState.get(keyA);
>>>> V value = get.read();
>>>> The underlying values will be a list of changes applied in order to
>>>> this map. The first read that sees multiple values should clear the list
>>>> and resolve it similar to what a combining state does, returning the only
>>>> Put (since Put = clear + append) in the list or the first PutIfAbsent.
>>>>
>>>> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <ki...@google.com> wrote:
>>>>
>>>>> Would this be a breaking change then? Going by your first examples,
>>>>> it's no longer a *deferred* read-then-write.
>>>>>
>>>>> I'm not seeing the benefit of having a changelog. If I'm reading at
>>>>> your examples correctly, it's saying it should evaluate putIfAbsent
>>>>> immediately and store the result in ReadableState until read is called?
>>>>>
>>>>> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I agree with the comment that you linked to Kiley and your
>>>>>> analysis of what happens today. Yes I believe the write will not happen
>>>>>> until the read() is invoked on ReadableState<V> returned from the
>>>>>> putIfAbsent(). The rest of the comment also says that if there are multiple
>>>>>> putIfAbsent calls then which one is written is dependent on which
>>>>>> ReadableState<V> read() is called leading to this behavior:
>>>>>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>>>>>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>>>>>> maybePut2.read(); // If keyA was absent, then it will have value2
>>>>>> otherwise it will maintain its old value.
>>>>>> maybePut1.read(); // This is effectively ignored
>>>>>>
>>>>>> I started this thread because a lot of these behaviors are surprising
>>>>>> and we should fix them to have ordering semantics based upon the actual
>>>>>> order of interactions with the MapState and not the current semantics which
>>>>>> rely on read() being invoked.
>>>>>>
>>>>>> Conveniently we don't have to resolve the read() immediately after a
>>>>>> put/putIfAbsent and can still have a blind append on to a multimap state.
>>>>>> We can treat the underlying value store as a changelog which we can figure
>>>>>> out the true value when performing a future read if we store <Operation,
>>>>>> Value> (where Operation is Put or PutIfAbsent) and can read from the start
>>>>>> to the end looking for the last Put in the list and if none exists the
>>>>>> first PutIfAbsent.
>>>>>>
>>>>>> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <ki...@google.com> wrote:
>>>>>>
>>>>>>> As a side note, SEO needs some work. The first result for "mapstate
>>>>>>> beam" on Google is from version 2.2.0
>>>>>>>
>>>>>>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> This section of the comment [1] says that calling read() is
>>>>>>>> required for the condition to be evaluated.
>>>>>>>>
>>>>>>>> Example in the comment is:
>>>>>>>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>>>> mapState.remove(keyA);
>>>>>>>> maybePut.read(); // writes to map
>>>>>>>>
>>>>>>>>  Also, it states that it should be a deferred
>>>>>>>> read-followed-by-write. Deferred until read is called?
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>>>>>>>
>>>>>>>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> I believe that all our ReadableState calls are point in time
>>>>>>>>> snapshots of what the underlying state is plus any changes that happened
>>>>>>>>> within the bundle already (e.g. you can see your own writes/appends/clears
>>>>>>>>> assuming that the creation of the ReadableState happened after the
>>>>>>>>> mutation).
>>>>>>>>>
>>>>>>>>> Here are some example of what I think should happen:
>>>>>>>>>
>>>>>>>>> #1
>>>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA,
>>>>>>>>> valueA)
>>>>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should
>>>>>>>>> equal "valueA"
>>>>>>>>>
>>>>>>>>> #2
>>>>>>>>> ReadableState<T> read = mapState.get(keyA);   // read should equal
>>>>>>>>> prior value
>>>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA,
>>>>>>>>> valueA);
>>>>>>>>>
>>>>>>>>> #3
>>>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA,
>>>>>>>>> valueA);
>>>>>>>>> // any calls to MapState put/remove for keyA can effectively drop
>>>>>>>>> the write that would be pending via maybePut
>>>>>>>>> // maybePut should also be resolved before the write happens to
>>>>>>>>> ensure that it can return true/false truthfully if the state was empty
>>>>>>>>>
>>>>>>>>> #4
>>>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA,
>>>>>>>>> valueA)
>>>>>>>>> // in a future bundle for same key and window
>>>>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should
>>>>>>>>> equal "valueA"
>>>>>>>>>
>>>>>>>>> I think #1, #3, and #4 don't currently work and #2 doesn't work if
>>>>>>>>> the mapState didn't contain keyA
>>>>>>>>>
>>>>>>>>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Can the change be observed in that case? I think the semantics
>>>>>>>>>> should be that the timing of the actual write cannot be observed.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yeah, the "or state is committed" means that we should resolve
>>>>>>>>>>> it before any additional writes but that only addresses part of the
>>>>>>>>>>> surprise I had.
>>>>>>>>>>>
>>>>>>>>>>> I would have expected that the putIfAbsent would force a
>>>>>>>>>>> resolution before transitioning to process the next key and window pair
>>>>>>>>>>> regardless whether the read() was invoked or there were additional writes.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <
>>>>>>>>>>> kenn@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I think it is an implementation bug. The javadoc for
>>>>>>>>>>>> putIfAbsent says "When {@code read()} is called on the result or state is
>>>>>>>>>>>> committed, it forces a read of the map and reconciliation with any pending
>>>>>>>>>>>> modifications."
>>>>>>>>>>>>
>>>>>>>>>>>> My reading of this is that the value changes for anything that
>>>>>>>>>>>> happens after the call to putIfAbsent. It would be good to make this clear,
>>>>>>>>>>>> that any subsequent observation of this cell should observe the new value.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yeah - I remember thinking that the
>>>>>>>>>>>>> computeIfAbsent/putIfAbsent semantics were very weird. I almost would have
>>>>>>>>>>>>> preferred not having those methods in MapState, even though they can be
>>>>>>>>>>>>> useful.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I was surprised to see the MapState API for
>>>>>>>>>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>>>>>>>>>> that is returned is resolved.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For example:
>>>>>>>>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key",
>>>>>>>>>>>>>> "new value maybe");
>>>>>>>>>>>>>> does nothing until
>>>>>>>>>>>>>> value.read();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It would seem like an obvious mistake for developers to make
>>>>>>>>>>>>>> to forget to do the value.read() when coming from the Java Map API which
>>>>>>>>>>>>>> always performs the write.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>

Re: MapState API

Posted by Ke Wu <ke...@gmail.com>.
Another question on this topic, is any work planned to add the map state support in portable mode [1], same for set state, list state?

[1] https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337 <https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337> 

> On Sep 9, 2021, at 9:31 AM, Luke Cwik <lc...@google.com> wrote:
> 
> The underlying state implementation that the FnApiDoFnRunner uses has a "close" method specifically meant to flush any pending operations so that part should be trivial. I don't think persisting the "changelog" would be difficult but migrating to using it in the future would cause issues for job update since we would be changing the encoding so deciding upfront which approach we want would be useful.
> 
> 
> 
> On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax <relax@google.com <ma...@google.com>> wrote:
> I think a changelog could be kept in memory - persisting a changelog seems overly complex. We would have to make sure to flush the changelog at the end of every bundle though, so it would only help if the map key was accessed multiple times in the same bundle. I don't think it's correct that we won't be able to remove the key. In your example, all of the calls are in a single processElement or a single bundle (I'm not sure if it's legal to maintain ReadableState references between elements within the same bundle though). In this case the call to remove would also be an operation journaled into the log, so after flushing the whole log at the end of the bundle the key would be removed.
> 
> I don't think the above would be that hard, but IMO it's also fine to do something simpler initially.
> 
> Reuven
> 
> On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok <kileysok@google.com <ma...@google.com>> wrote:
> Is adding the complexity of the changelog worth it instead of resolving immediately? We would be storing much more persisted data and wouldn't be able to clear the key even on a remove().
> 
> For example:
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
> mapState.remove(keyA); // still need to keep the changelog
> V maybePut2Value = maybePut2.read(); // should return value1
> 
> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> I believe we should consider these bugs and fix them since they are surprising behaviors for users based upon what people are used to from similar APIs in other Map constructs.
> 
> The benefit of the changelog approach is to allow for putIfAbsent to have a blind append if read is never called. To expand on my example of putIfAbsent:
> First Bundle (assuming no read in this bundle):
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  // Ignore this since we now know that the map has an entry
> Produces one blind append of (keyA, (PutIfAbsent, value1))
> 
> Second Bundle:
> ReadableState<V> get = mapState.get(keyA);
> V value = get.read();
> The underlying values will be a list of changes applied in order to this map. The first read that sees multiple values should clear the list and resolve it similar to what a combining state does, returning the only Put (since Put = clear + append) in the list or the first PutIfAbsent.
> 
> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <kileysok@google.com <ma...@google.com>> wrote:
> Would this be a breaking change then? Going by your first examples, it's no longer a deferred read-then-write. 
> 
> I'm not seeing the benefit of having a changelog. If I'm reading at your examples correctly, it's saying it should evaluate putIfAbsent immediately and store the result in ReadableState until read is called?
> 
> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> I agree with the comment that you linked to Kiley and your analysis of what happens today. Yes I believe the write will not happen until the read() is invoked on ReadableState<V> returned from the putIfAbsent(). The rest of the comment also says that if there are multiple putIfAbsent calls then which one is written is dependent on which ReadableState<V> read() is called leading to this behavior:
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
> maybePut2.read(); // If keyA was absent, then it will have value2 otherwise it will maintain its old value.
> maybePut1.read(); // This is effectively ignored
> 
> I started this thread because a lot of these behaviors are surprising and we should fix them to have ordering semantics based upon the actual order of interactions with the MapState and not the current semantics which rely on read() being invoked.
> 
> Conveniently we don't have to resolve the read() immediately after a put/putIfAbsent and can still have a blind append on to a multimap state. We can treat the underlying value store as a changelog which we can figure out the true value when performing a future read if we store <Operation, Value> (where Operation is Put or PutIfAbsent) and can read from the start to the end looking for the last Put in the list and if none exists the first PutIfAbsent.
> 
> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <kileysok@google.com <ma...@google.com>> wrote:
> As a side note, SEO needs some work. The first result for "mapstate beam" on Google is from version 2.2.0
> 
> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <kileysok@google.com <ma...@google.com>> wrote:
> This section of the comment [1] says that calling read() is required for the condition to be evaluated.
> 
> Example in the comment is:
> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
> mapState.remove(keyA);
> maybePut.read(); // writes to map
> 
>  Also, it states that it should be a deferred read-followed-by-write. Deferred until read is called?
> 
> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64 <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64>
> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> I believe that all our ReadableState calls are point in time snapshots of what the underlying state is plus any changes that happened within the bundle already (e.g. you can see your own writes/appends/clears assuming that the creation of the ReadableState happened after the mutation).
> 
> Here are some example of what I think should happen:
> 
> #1
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
> 
> #2
> ReadableState<T> read = mapState.get(keyA);   // read should equal prior value
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
> 
> #3
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
> // any calls to MapState put/remove for keyA can effectively drop the write that would be pending via maybePut
> // maybePut should also be resolved before the write happens to ensure that it can return true/false truthfully if the state was empty
> 
> #4
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
> // in a future bundle for same key and window
> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
> 
> I think #1, #3, and #4 don't currently work and #2 doesn't work if the mapState didn't contain keyA
> 
> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <kenn@apache.org <ma...@apache.org>> wrote:
> Can the change be observed in that case? I think the semantics should be that the timing of the actual write cannot be observed.
> 
> Kenn
> 
> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> Yeah, the "or state is committed" means that we should resolve it before any additional writes but that only addresses part of the surprise I had.
> 
> I would have expected that the putIfAbsent would force a resolution before transitioning to process the next key and window pair regardless whether the read() was invoked or there were additional writes.
> 
> 
> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <kenn@apache.org <ma...@apache.org>> wrote:
> I think it is an implementation bug. The javadoc for putIfAbsent says "When {@code read()} is called on the result or state is committed, it forces a read of the map and reconciliation with any pending modifications."
> 
> My reading of this is that the value changes for anything that happens after the call to putIfAbsent. It would be good to make this clear, that any subsequent observation of this cell should observe the new value.
> 
> Kenn
> 
> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <relax@google.com <ma...@google.com>> wrote:
> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent semantics were very weird. I almost would have preferred not having those methods in MapState, even though they can be useful.
> 
> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> I was surprised to see the MapState API for computeIfAbsent/putIfAbsent only performs the write if the ReadableState that is returned is resolved.
> 
> For example:
> ReadableState<String> value = mapState.putIfAbsent("key", "new value maybe");
> does nothing until
> value.read();
> 
> It would seem like an obvious mistake for developers to make to forget to do the value.read() when coming from the Java Map API which always performs the write.


Re: MapState API

Posted by Luke Cwik <lc...@google.com>.
The underlying state implementation that the FnApiDoFnRunner uses has a
"close" method specifically meant to flush any pending operations so that
part should be trivial. I don't think persisting the "changelog" would be
difficult but migrating to using it in the future would cause issues for
job update since we would be changing the encoding so deciding upfront
which approach we want would be useful.



On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax <re...@google.com> wrote:

> I think a changelog could be kept in memory - persisting a changelog seems
> overly complex. We would have to make sure to flush the changelog at the
> end of every bundle though, so it would only help if the map key was
> accessed multiple times in the same bundle. I don't think it's correct that
> we won't be able to remove the key. In your example, all of the calls are
> in a single processElement or a single bundle (I'm not sure if it's legal
> to maintain ReadableState references between elements within the same
> bundle though). In this case the call to remove would also be an operation
> journaled into the log, so after flushing the whole log at the end of the
> bundle the key would be removed.
>
> I don't think the above would be that hard, but IMO it's also fine to do
> something simpler initially.
>
> Reuven
>
> On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok <ki...@google.com> wrote:
>
>> Is adding the complexity of the changelog worth it instead of resolving
>> immediately? We would be storing much more persisted data and wouldn't be
>> able to clear the key even on a remove().
>>
>> For example:
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>> mapState.remove(keyA); // still need to keep the changelog
>> V maybePut2Value = maybePut2.read(); // should return value1
>>
>> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> I believe we should consider these bugs and fix them since they are
>>> surprising behaviors for users based upon what people are used to from
>>> similar APIs in other Map constructs.
>>>
>>> The benefit of the changelog approach is to allow for putIfAbsent to
>>> have a blind append if read is never called. To expand on my example of
>>> putIfAbsent:
>>> First Bundle (assuming no read in this bundle):
>>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  //
>>> Ignore this since we now know that the map has an entry
>>> Produces one blind append of (keyA, (PutIfAbsent, value1))
>>>
>>> Second Bundle:
>>> ReadableState<V> get = mapState.get(keyA);
>>> V value = get.read();
>>> The underlying values will be a list of changes applied in order to this
>>> map. The first read that sees multiple values should clear the list and
>>> resolve it similar to what a combining state does, returning the only Put
>>> (since Put = clear + append) in the list or the first PutIfAbsent.
>>>
>>> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <ki...@google.com> wrote:
>>>
>>>> Would this be a breaking change then? Going by your first examples,
>>>> it's no longer a *deferred* read-then-write.
>>>>
>>>> I'm not seeing the benefit of having a changelog. If I'm reading at
>>>> your examples correctly, it's saying it should evaluate putIfAbsent
>>>> immediately and store the result in ReadableState until read is called?
>>>>
>>>> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> I agree with the comment that you linked to Kiley and your analysis of
>>>>> what happens today. Yes I believe the write will not happen until the
>>>>> read() is invoked on ReadableState<V> returned from the putIfAbsent(). The
>>>>> rest of the comment also says that if there are multiple putIfAbsent calls
>>>>> then which one is written is dependent on which ReadableState<V> read() is
>>>>> called leading to this behavior:
>>>>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>>>>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>>>>> maybePut2.read(); // If keyA was absent, then it will have value2
>>>>> otherwise it will maintain its old value.
>>>>> maybePut1.read(); // This is effectively ignored
>>>>>
>>>>> I started this thread because a lot of these behaviors are surprising
>>>>> and we should fix them to have ordering semantics based upon the actual
>>>>> order of interactions with the MapState and not the current semantics which
>>>>> rely on read() being invoked.
>>>>>
>>>>> Conveniently we don't have to resolve the read() immediately after a
>>>>> put/putIfAbsent and can still have a blind append on to a multimap state.
>>>>> We can treat the underlying value store as a changelog which we can figure
>>>>> out the true value when performing a future read if we store <Operation,
>>>>> Value> (where Operation is Put or PutIfAbsent) and can read from the start
>>>>> to the end looking for the last Put in the list and if none exists the
>>>>> first PutIfAbsent.
>>>>>
>>>>> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <ki...@google.com> wrote:
>>>>>
>>>>>> As a side note, SEO needs some work. The first result for "mapstate
>>>>>> beam" on Google is from version 2.2.0
>>>>>>
>>>>>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com> wrote:
>>>>>>
>>>>>>> This section of the comment [1] says that calling read() is required
>>>>>>> for the condition to be evaluated.
>>>>>>>
>>>>>>> Example in the comment is:
>>>>>>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>>> mapState.remove(keyA);
>>>>>>> maybePut.read(); // writes to map
>>>>>>>
>>>>>>>  Also, it states that it should be a deferred
>>>>>>> read-followed-by-write. Deferred until read is called?
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>>>>>>
>>>>>>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> I believe that all our ReadableState calls are point in time
>>>>>>>> snapshots of what the underlying state is plus any changes that happened
>>>>>>>> within the bundle already (e.g. you can see your own writes/appends/clears
>>>>>>>> assuming that the creation of the ReadableState happened after the
>>>>>>>> mutation).
>>>>>>>>
>>>>>>>> Here are some example of what I think should happen:
>>>>>>>>
>>>>>>>> #1
>>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>>>>>> "valueA"
>>>>>>>>
>>>>>>>> #2
>>>>>>>> ReadableState<T> read = mapState.get(keyA);   // read should equal
>>>>>>>> prior value
>>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA,
>>>>>>>> valueA);
>>>>>>>>
>>>>>>>> #3
>>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA,
>>>>>>>> valueA);
>>>>>>>> // any calls to MapState put/remove for keyA can effectively drop
>>>>>>>> the write that would be pending via maybePut
>>>>>>>> // maybePut should also be resolved before the write happens to
>>>>>>>> ensure that it can return true/false truthfully if the state was empty
>>>>>>>>
>>>>>>>> #4
>>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>>>>> // in a future bundle for same key and window
>>>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>>>>>> "valueA"
>>>>>>>>
>>>>>>>> I think #1, #3, and #4 don't currently work and #2 doesn't work if
>>>>>>>> the mapState didn't contain keyA
>>>>>>>>
>>>>>>>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Can the change be observed in that case? I think the semantics
>>>>>>>>> should be that the timing of the actual write cannot be observed.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yeah, the "or state is committed" means that we should resolve it
>>>>>>>>>> before any additional writes but that only addresses part of the surprise I
>>>>>>>>>> had.
>>>>>>>>>>
>>>>>>>>>> I would have expected that the putIfAbsent would force a
>>>>>>>>>> resolution before transitioning to process the next key and window pair
>>>>>>>>>> regardless whether the read() was invoked or there were additional writes.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think it is an implementation bug. The javadoc for putIfAbsent
>>>>>>>>>>> says "When {@code read()} is called on the result or state is committed, it
>>>>>>>>>>> forces a read of the map and reconciliation with any pending modifications."
>>>>>>>>>>>
>>>>>>>>>>> My reading of this is that the value changes for anything that
>>>>>>>>>>> happens after the call to putIfAbsent. It would be good to make this clear,
>>>>>>>>>>> that any subsequent observation of this cell should observe the new value.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent
>>>>>>>>>>>> semantics were very weird. I almost would have preferred not having those
>>>>>>>>>>>> methods in MapState, even though they can be useful.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I was surprised to see the MapState API for
>>>>>>>>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>>>>>>>>> that is returned is resolved.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For example:
>>>>>>>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new
>>>>>>>>>>>>> value maybe");
>>>>>>>>>>>>> does nothing until
>>>>>>>>>>>>> value.read();
>>>>>>>>>>>>>
>>>>>>>>>>>>> It would seem like an obvious mistake for developers to make
>>>>>>>>>>>>> to forget to do the value.read() when coming from the Java Map API which
>>>>>>>>>>>>> always performs the write.
>>>>>>>>>>>>>
>>>>>>>>>>>>

Re: MapState API

Posted by Reuven Lax <re...@google.com>.
I think a changelog could be kept in memory - persisting a changelog seems
overly complex. We would have to make sure to flush the changelog at the
end of every bundle though, so it would only help if the map key was
accessed multiple times in the same bundle. I don't think it's correct that
we won't be able to remove the key. In your example, all of the calls are
in a single processElement or a single bundle (I'm not sure if it's legal
to maintain ReadableState references between elements within the same
bundle though). In this case the call to remove would also be an operation
journaled into the log, so after flushing the whole log at the end of the
bundle the key would be removed.

I don't think the above would be that hard, but IMO it's also fine to do
something simpler initially.

Reuven

On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok <ki...@google.com> wrote:

> Is adding the complexity of the changelog worth it instead of resolving
> immediately? We would be storing much more persisted data and wouldn't be
> able to clear the key even on a remove().
>
> For example:
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
> mapState.remove(keyA); // still need to keep the changelog
> V maybePut2Value = maybePut2.read(); // should return value1
>
> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <lc...@google.com> wrote:
>
>> I believe we should consider these bugs and fix them since they are
>> surprising behaviors for users based upon what people are used to from
>> similar APIs in other Map constructs.
>>
>> The benefit of the changelog approach is to allow for putIfAbsent to have
>> a blind append if read is never called. To expand on my example of
>> putIfAbsent:
>> First Bundle (assuming no read in this bundle):
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  //
>> Ignore this since we now know that the map has an entry
>> Produces one blind append of (keyA, (PutIfAbsent, value1))
>>
>> Second Bundle:
>> ReadableState<V> get = mapState.get(keyA);
>> V value = get.read();
>> The underlying values will be a list of changes applied in order to this
>> map. The first read that sees multiple values should clear the list and
>> resolve it similar to what a combining state does, returning the only Put
>> (since Put = clear + append) in the list or the first PutIfAbsent.
>>
>> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <ki...@google.com> wrote:
>>
>>> Would this be a breaking change then? Going by your first examples, it's
>>> no longer a *deferred* read-then-write.
>>>
>>> I'm not seeing the benefit of having a changelog. If I'm reading at your
>>> examples correctly, it's saying it should evaluate putIfAbsent immediately
>>> and store the result in ReadableState until read is called?
>>>
>>> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> I agree with the comment that you linked to Kiley and your analysis of
>>>> what happens today. Yes I believe the write will not happen until the
>>>> read() is invoked on ReadableState<V> returned from the putIfAbsent(). The
>>>> rest of the comment also says that if there are multiple putIfAbsent calls
>>>> then which one is written is dependent on which ReadableState<V> read() is
>>>> called leading to this behavior:
>>>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>>>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>>>> maybePut2.read(); // If keyA was absent, then it will have value2
>>>> otherwise it will maintain its old value.
>>>> maybePut1.read(); // This is effectively ignored
>>>>
>>>> I started this thread because a lot of these behaviors are surprising
>>>> and we should fix them to have ordering semantics based upon the actual
>>>> order of interactions with the MapState and not the current semantics which
>>>> rely on read() being invoked.
>>>>
>>>> Conveniently we don't have to resolve the read() immediately after a
>>>> put/putIfAbsent and can still have a blind append on to a multimap state.
>>>> We can treat the underlying value store as a changelog which we can figure
>>>> out the true value when performing a future read if we store <Operation,
>>>> Value> (where Operation is Put or PutIfAbsent) and can read from the start
>>>> to the end looking for the last Put in the list and if none exists the
>>>> first PutIfAbsent.
>>>>
>>>> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <ki...@google.com> wrote:
>>>>
>>>>> As a side note, SEO needs some work. The first result for "mapstate
>>>>> beam" on Google is from version 2.2.0
>>>>>
>>>>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com> wrote:
>>>>>
>>>>>> This section of the comment [1] says that calling read() is required
>>>>>> for the condition to be evaluated.
>>>>>>
>>>>>> Example in the comment is:
>>>>>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>> mapState.remove(keyA);
>>>>>> maybePut.read(); // writes to map
>>>>>>
>>>>>>  Also, it states that it should be a deferred read-followed-by-write.
>>>>>> Deferred until read is called?
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>>>>>
>>>>>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> I believe that all our ReadableState calls are point in time
>>>>>>> snapshots of what the underlying state is plus any changes that happened
>>>>>>> within the bundle already (e.g. you can see your own writes/appends/clears
>>>>>>> assuming that the creation of the ReadableState happened after the
>>>>>>> mutation).
>>>>>>>
>>>>>>> Here are some example of what I think should happen:
>>>>>>>
>>>>>>> #1
>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>>>>> "valueA"
>>>>>>>
>>>>>>> #2
>>>>>>> ReadableState<T> read = mapState.get(keyA);   // read should equal
>>>>>>> prior value
>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>>>
>>>>>>> #3
>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>>> // any calls to MapState put/remove for keyA can effectively drop
>>>>>>> the write that would be pending via maybePut
>>>>>>> // maybePut should also be resolved before the write happens to
>>>>>>> ensure that it can return true/false truthfully if the state was empty
>>>>>>>
>>>>>>> #4
>>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>>>> // in a future bundle for same key and window
>>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>>>>> "valueA"
>>>>>>>
>>>>>>> I think #1, #3, and #4 don't currently work and #2 doesn't work if
>>>>>>> the mapState didn't contain keyA
>>>>>>>
>>>>>>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Can the change be observed in that case? I think the semantics
>>>>>>>> should be that the timing of the actual write cannot be observed.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yeah, the "or state is committed" means that we should resolve it
>>>>>>>>> before any additional writes but that only addresses part of the surprise I
>>>>>>>>> had.
>>>>>>>>>
>>>>>>>>> I would have expected that the putIfAbsent would force a
>>>>>>>>> resolution before transitioning to process the next key and window pair
>>>>>>>>> regardless whether the read() was invoked or there were additional writes.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I think it is an implementation bug. The javadoc for putIfAbsent
>>>>>>>>>> says "When {@code read()} is called on the result or state is committed, it
>>>>>>>>>> forces a read of the map and reconciliation with any pending modifications."
>>>>>>>>>>
>>>>>>>>>> My reading of this is that the value changes for anything that
>>>>>>>>>> happens after the call to putIfAbsent. It would be good to make this clear,
>>>>>>>>>> that any subsequent observation of this cell should observe the new value.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent
>>>>>>>>>>> semantics were very weird. I almost would have preferred not having those
>>>>>>>>>>> methods in MapState, even though they can be useful.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I was surprised to see the MapState API for
>>>>>>>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>>>>>>>> that is returned is resolved.
>>>>>>>>>>>>
>>>>>>>>>>>> For example:
>>>>>>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new
>>>>>>>>>>>> value maybe");
>>>>>>>>>>>> does nothing until
>>>>>>>>>>>> value.read();
>>>>>>>>>>>>
>>>>>>>>>>>> It would seem like an obvious mistake for developers to make to
>>>>>>>>>>>> forget to do the value.read() when coming from the Java Map API which
>>>>>>>>>>>> always performs the write.
>>>>>>>>>>>>
>>>>>>>>>>>

Re: MapState API

Posted by Kiley Sok <ki...@google.com>.
Is adding the complexity of the changelog worth it instead of resolving
immediately? We would be storing much more persisted data and wouldn't be
able to clear the key even on a remove().

For example:
ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
mapState.remove(keyA); // still need to keep the changelog
V maybePut2Value = maybePut2.read(); // should return value1

On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <lc...@google.com> wrote:

> I believe we should consider these bugs and fix them since they are
> surprising behaviors for users based upon what people are used to from
> similar APIs in other Map constructs.
>
> The benefit of the changelog approach is to allow for putIfAbsent to have
> a blind append if read is never called. To expand on my example of
> putIfAbsent:
> First Bundle (assuming no read in this bundle):
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  //
> Ignore this since we now know that the map has an entry
> Produces one blind append of (keyA, (PutIfAbsent, value1))
>
> Second Bundle:
> ReadableState<V> get = mapState.get(keyA);
> V value = get.read();
> The underlying values will be a list of changes applied in order to this
> map. The first read that sees multiple values should clear the list and
> resolve it similar to what a combining state does, returning the only Put
> (since Put = clear + append) in the list or the first PutIfAbsent.
>
> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <ki...@google.com> wrote:
>
>> Would this be a breaking change then? Going by your first examples, it's
>> no longer a *deferred* read-then-write.
>>
>> I'm not seeing the benefit of having a changelog. If I'm reading at your
>> examples correctly, it's saying it should evaluate putIfAbsent immediately
>> and store the result in ReadableState until read is called?
>>
>> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> I agree with the comment that you linked to Kiley and your analysis of
>>> what happens today. Yes I believe the write will not happen until the
>>> read() is invoked on ReadableState<V> returned from the putIfAbsent(). The
>>> rest of the comment also says that if there are multiple putIfAbsent calls
>>> then which one is written is dependent on which ReadableState<V> read() is
>>> called leading to this behavior:
>>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>>> maybePut2.read(); // If keyA was absent, then it will have value2
>>> otherwise it will maintain its old value.
>>> maybePut1.read(); // This is effectively ignored
>>>
>>> I started this thread because a lot of these behaviors are surprising
>>> and we should fix them to have ordering semantics based upon the actual
>>> order of interactions with the MapState and not the current semantics which
>>> rely on read() being invoked.
>>>
>>> Conveniently we don't have to resolve the read() immediately after a
>>> put/putIfAbsent and can still have a blind append on to a multimap state.
>>> We can treat the underlying value store as a changelog which we can figure
>>> out the true value when performing a future read if we store <Operation,
>>> Value> (where Operation is Put or PutIfAbsent) and can read from the start
>>> to the end looking for the last Put in the list and if none exists the
>>> first PutIfAbsent.
>>>
>>> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <ki...@google.com> wrote:
>>>
>>>> As a side note, SEO needs some work. The first result for "mapstate
>>>> beam" on Google is from version 2.2.0
>>>>
>>>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com> wrote:
>>>>
>>>>> This section of the comment [1] says that calling read() is required
>>>>> for the condition to be evaluated.
>>>>>
>>>>> Example in the comment is:
>>>>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>> mapState.remove(keyA);
>>>>> maybePut.read(); // writes to map
>>>>>
>>>>>  Also, it states that it should be a deferred read-followed-by-write.
>>>>> Deferred until read is called?
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>>>>
>>>>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I believe that all our ReadableState calls are point in time
>>>>>> snapshots of what the underlying state is plus any changes that happened
>>>>>> within the bundle already (e.g. you can see your own writes/appends/clears
>>>>>> assuming that the creation of the ReadableState happened after the
>>>>>> mutation).
>>>>>>
>>>>>> Here are some example of what I think should happen:
>>>>>>
>>>>>> #1
>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>>>> "valueA"
>>>>>>
>>>>>> #2
>>>>>> ReadableState<T> read = mapState.get(keyA);   // read should equal
>>>>>> prior value
>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>>
>>>>>> #3
>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>> // any calls to MapState put/remove for keyA can effectively drop the
>>>>>> write that would be pending via maybePut
>>>>>> // maybePut should also be resolved before the write happens to
>>>>>> ensure that it can return true/false truthfully if the state was empty
>>>>>>
>>>>>> #4
>>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>>> // in a future bundle for same key and window
>>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>>>> "valueA"
>>>>>>
>>>>>> I think #1, #3, and #4 don't currently work and #2 doesn't work if
>>>>>> the mapState didn't contain keyA
>>>>>>
>>>>>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Can the change be observed in that case? I think the semantics
>>>>>>> should be that the timing of the actual write cannot be observed.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> Yeah, the "or state is committed" means that we should resolve it
>>>>>>>> before any additional writes but that only addresses part of the surprise I
>>>>>>>> had.
>>>>>>>>
>>>>>>>> I would have expected that the putIfAbsent would force a resolution
>>>>>>>> before transitioning to process the next key and window pair regardless
>>>>>>>> whether the read() was invoked or there were additional writes.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think it is an implementation bug. The javadoc for putIfAbsent
>>>>>>>>> says "When {@code read()} is called on the result or state is committed, it
>>>>>>>>> forces a read of the map and reconciliation with any pending modifications."
>>>>>>>>>
>>>>>>>>> My reading of this is that the value changes for anything that
>>>>>>>>> happens after the call to putIfAbsent. It would be good to make this clear,
>>>>>>>>> that any subsequent observation of this cell should observe the new value.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent
>>>>>>>>>> semantics were very weird. I almost would have preferred not having those
>>>>>>>>>> methods in MapState, even though they can be useful.
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I was surprised to see the MapState API for
>>>>>>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>>>>>>> that is returned is resolved.
>>>>>>>>>>>
>>>>>>>>>>> For example:
>>>>>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new
>>>>>>>>>>> value maybe");
>>>>>>>>>>> does nothing until
>>>>>>>>>>> value.read();
>>>>>>>>>>>
>>>>>>>>>>> It would seem like an obvious mistake for developers to make to
>>>>>>>>>>> forget to do the value.read() when coming from the Java Map API which
>>>>>>>>>>> always performs the write.
>>>>>>>>>>>
>>>>>>>>>>

Re: MapState API

Posted by Luke Cwik <lc...@google.com>.
I believe we should consider these bugs and fix them since they are
surprising behaviors for users based upon what people are used to from
similar APIs in other Map constructs.

The benefit of the changelog approach is to allow for putIfAbsent to have a
blind append if read is never called. To expand on my example of
putIfAbsent:
First Bundle (assuming no read in this bundle):
ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  // Ignore
this since we now know that the map has an entry
Produces one blind append of (keyA, (PutIfAbsent, value1))

Second Bundle:
ReadableState<V> get = mapState.get(keyA);
V value = get.read();
The underlying values will be a list of changes applied in order to this
map. The first read that sees multiple values should clear the list and
resolve it similar to what a combining state does, returning the only Put
(since Put = clear + append) in the list or the first PutIfAbsent.

On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <ki...@google.com> wrote:

> Would this be a breaking change then? Going by your first examples, it's
> no longer a *deferred* read-then-write.
>
> I'm not seeing the benefit of having a changelog. If I'm reading at your
> examples correctly, it's saying it should evaluate putIfAbsent immediately
> and store the result in ReadableState until read is called?
>
> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <lc...@google.com> wrote:
>
>> I agree with the comment that you linked to Kiley and your analysis of
>> what happens today. Yes I believe the write will not happen until the
>> read() is invoked on ReadableState<V> returned from the putIfAbsent(). The
>> rest of the comment also says that if there are multiple putIfAbsent calls
>> then which one is written is dependent on which ReadableState<V> read() is
>> called leading to this behavior:
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>> maybePut2.read(); // If keyA was absent, then it will have value2
>> otherwise it will maintain its old value.
>> maybePut1.read(); // This is effectively ignored
>>
>> I started this thread because a lot of these behaviors are surprising and
>> we should fix them to have ordering semantics based upon the actual order
>> of interactions with the MapState and not the current semantics which rely
>> on read() being invoked.
>>
>> Conveniently we don't have to resolve the read() immediately after a
>> put/putIfAbsent and can still have a blind append on to a multimap state.
>> We can treat the underlying value store as a changelog which we can figure
>> out the true value when performing a future read if we store <Operation,
>> Value> (where Operation is Put or PutIfAbsent) and can read from the start
>> to the end looking for the last Put in the list and if none exists the
>> first PutIfAbsent.
>>
>> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <ki...@google.com> wrote:
>>
>>> As a side note, SEO needs some work. The first result for "mapstate
>>> beam" on Google is from version 2.2.0
>>>
>>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com> wrote:
>>>
>>>> This section of the comment [1] says that calling read() is required
>>>> for the condition to be evaluated.
>>>>
>>>> Example in the comment is:
>>>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>> mapState.remove(keyA);
>>>> maybePut.read(); // writes to map
>>>>
>>>>  Also, it states that it should be a deferred read-followed-by-write.
>>>> Deferred until read is called?
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>>>
>>>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> I believe that all our ReadableState calls are point in time snapshots
>>>>> of what the underlying state is plus any changes that happened within the
>>>>> bundle already (e.g. you can see your own writes/appends/clears assuming
>>>>> that the creation of the ReadableState happened after the mutation).
>>>>>
>>>>> Here are some example of what I think should happen:
>>>>>
>>>>> #1
>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>>> "valueA"
>>>>>
>>>>> #2
>>>>> ReadableState<T> read = mapState.get(keyA);   // read should equal
>>>>> prior value
>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>>
>>>>> #3
>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>> // any calls to MapState put/remove for keyA can effectively drop the
>>>>> write that would be pending via maybePut
>>>>> // maybePut should also be resolved before the write happens to ensure
>>>>> that it can return true/false truthfully if the state was empty
>>>>>
>>>>> #4
>>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>>> // in a future bundle for same key and window
>>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>>> "valueA"
>>>>>
>>>>> I think #1, #3, and #4 don't currently work and #2 doesn't work if the
>>>>> mapState didn't contain keyA
>>>>>
>>>>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Can the change be observed in that case? I think the semantics should
>>>>>> be that the timing of the actual write cannot be observed.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Yeah, the "or state is committed" means that we should resolve it
>>>>>>> before any additional writes but that only addresses part of the surprise I
>>>>>>> had.
>>>>>>>
>>>>>>> I would have expected that the putIfAbsent would force a resolution
>>>>>>> before transitioning to process the next key and window pair regardless
>>>>>>> whether the read() was invoked or there were additional writes.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think it is an implementation bug. The javadoc for putIfAbsent
>>>>>>>> says "When {@code read()} is called on the result or state is committed, it
>>>>>>>> forces a read of the map and reconciliation with any pending modifications."
>>>>>>>>
>>>>>>>> My reading of this is that the value changes for anything that
>>>>>>>> happens after the call to putIfAbsent. It would be good to make this clear,
>>>>>>>> that any subsequent observation of this cell should observe the new value.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent
>>>>>>>>> semantics were very weird. I almost would have preferred not having those
>>>>>>>>> methods in MapState, even though they can be useful.
>>>>>>>>>
>>>>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I was surprised to see the MapState API for
>>>>>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>>>>>> that is returned is resolved.
>>>>>>>>>>
>>>>>>>>>> For example:
>>>>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new
>>>>>>>>>> value maybe");
>>>>>>>>>> does nothing until
>>>>>>>>>> value.read();
>>>>>>>>>>
>>>>>>>>>> It would seem like an obvious mistake for developers to make to
>>>>>>>>>> forget to do the value.read() when coming from the Java Map API which
>>>>>>>>>> always performs the write.
>>>>>>>>>>
>>>>>>>>>

Re: MapState API

Posted by Kiley Sok <ki...@google.com>.
Would this be a breaking change then? Going by your first examples, it's no
longer a *deferred* read-then-write.

I'm not seeing the benefit of having a changelog. If I'm reading at your
examples correctly, it's saying it should evaluate putIfAbsent immediately
and store the result in ReadableState until read is called?

On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <lc...@google.com> wrote:

> I agree with the comment that you linked to Kiley and your analysis of
> what happens today. Yes I believe the write will not happen until the
> read() is invoked on ReadableState<V> returned from the putIfAbsent(). The
> rest of the comment also says that if there are multiple putIfAbsent calls
> then which one is written is dependent on which ReadableState<V> read() is
> called leading to this behavior:
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
> maybePut2.read(); // If keyA was absent, then it will have value2
> otherwise it will maintain its old value.
> maybePut1.read(); // This is effectively ignored
>
> I started this thread because a lot of these behaviors are surprising and
> we should fix them to have ordering semantics based upon the actual order
> of interactions with the MapState and not the current semantics which rely
> on read() being invoked.
>
> Conveniently we don't have to resolve the read() immediately after a
> put/putIfAbsent and can still have a blind append on to a multimap state.
> We can treat the underlying value store as a changelog which we can figure
> out the true value when performing a future read if we store <Operation,
> Value> (where Operation is Put or PutIfAbsent) and can read from the start
> to the end looking for the last Put in the list and if none exists the
> first PutIfAbsent.
>
> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <ki...@google.com> wrote:
>
>> As a side note, SEO needs some work. The first result for "mapstate beam"
>> on Google is from version 2.2.0
>>
>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com> wrote:
>>
>>> This section of the comment [1] says that calling read() is required for
>>> the condition to be evaluated.
>>>
>>> Example in the comment is:
>>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>>> mapState.remove(keyA);
>>> maybePut.read(); // writes to map
>>>
>>>  Also, it states that it should be a deferred read-followed-by-write.
>>> Deferred until read is called?
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>>
>>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> I believe that all our ReadableState calls are point in time snapshots
>>>> of what the underlying state is plus any changes that happened within the
>>>> bundle already (e.g. you can see your own writes/appends/clears assuming
>>>> that the creation of the ReadableState happened after the mutation).
>>>>
>>>> Here are some example of what I think should happen:
>>>>
>>>> #1
>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>> "valueA"
>>>>
>>>> #2
>>>> ReadableState<T> read = mapState.get(keyA);   // read should equal
>>>> prior value
>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>>
>>>> #3
>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>> // any calls to MapState put/remove for keyA can effectively drop the
>>>> write that would be pending via maybePut
>>>> // maybePut should also be resolved before the write happens to ensure
>>>> that it can return true/false truthfully if the state was empty
>>>>
>>>> #4
>>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>>> // in a future bundle for same key and window
>>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>>> "valueA"
>>>>
>>>> I think #1, #3, and #4 don't currently work and #2 doesn't work if the
>>>> mapState didn't contain keyA
>>>>
>>>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> Can the change be observed in that case? I think the semantics should
>>>>> be that the timing of the actual write cannot be observed.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Yeah, the "or state is committed" means that we should resolve it
>>>>>> before any additional writes but that only addresses part of the surprise I
>>>>>> had.
>>>>>>
>>>>>> I would have expected that the putIfAbsent would force a resolution
>>>>>> before transitioning to process the next key and window pair regardless
>>>>>> whether the read() was invoked or there were additional writes.
>>>>>>
>>>>>>
>>>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I think it is an implementation bug. The javadoc for putIfAbsent
>>>>>>> says "When {@code read()} is called on the result or state is committed, it
>>>>>>> forces a read of the map and reconciliation with any pending modifications."
>>>>>>>
>>>>>>> My reading of this is that the value changes for anything that
>>>>>>> happens after the call to putIfAbsent. It would be good to make this clear,
>>>>>>> that any subsequent observation of this cell should observe the new value.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent
>>>>>>>> semantics were very weird. I almost would have preferred not having those
>>>>>>>> methods in MapState, even though they can be useful.
>>>>>>>>
>>>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> I was surprised to see the MapState API for
>>>>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>>>>> that is returned is resolved.
>>>>>>>>>
>>>>>>>>> For example:
>>>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new
>>>>>>>>> value maybe");
>>>>>>>>> does nothing until
>>>>>>>>> value.read();
>>>>>>>>>
>>>>>>>>> It would seem like an obvious mistake for developers to make to
>>>>>>>>> forget to do the value.read() when coming from the Java Map API which
>>>>>>>>> always performs the write.
>>>>>>>>>
>>>>>>>>

Re: MapState API

Posted by Luke Cwik <lc...@google.com>.
I agree with the comment that you linked to Kiley and your analysis of what
happens today. Yes I believe the write will not happen until the read() is
invoked on ReadableState<V> returned from the putIfAbsent(). The rest of
the comment also says that if there are multiple putIfAbsent calls then
which one is written is dependent on which ReadableState<V> read() is
called leading to this behavior:
ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
maybePut2.read(); // If keyA was absent, then it will have value2 otherwise
it will maintain its old value.
maybePut1.read(); // This is effectively ignored

I started this thread because a lot of these behaviors are surprising and
we should fix them to have ordering semantics based upon the actual order
of interactions with the MapState and not the current semantics which rely
on read() being invoked.

Conveniently we don't have to resolve the read() immediately after a
put/putIfAbsent and can still have a blind append on to a multimap state.
We can treat the underlying value store as a changelog which we can figure
out the true value when performing a future read if we store <Operation,
Value> (where Operation is Put or PutIfAbsent) and can read from the start
to the end looking for the last Put in the list and if none exists the
first PutIfAbsent.

On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <ki...@google.com> wrote:

> As a side note, SEO needs some work. The first result for "mapstate beam"
> on Google is from version 2.2.0
>
> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com> wrote:
>
>> This section of the comment [1] says that calling read() is required for
>> the condition to be evaluated.
>>
>> Example in the comment is:
>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>> mapState.remove(keyA);
>> maybePut.read(); // writes to map
>>
>>  Also, it states that it should be a deferred read-followed-by-write.
>> Deferred until read is called?
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>
>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> I believe that all our ReadableState calls are point in time snapshots
>>> of what the underlying state is plus any changes that happened within the
>>> bundle already (e.g. you can see your own writes/appends/clears assuming
>>> that the creation of the ReadableState happened after the mutation).
>>>
>>> Here are some example of what I think should happen:
>>>
>>> #1
>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>> "valueA"
>>>
>>> #2
>>> ReadableState<T> read = mapState.get(keyA);   // read should equal prior
>>> value
>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>
>>> #3
>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>> // any calls to MapState put/remove for keyA can effectively drop the
>>> write that would be pending via maybePut
>>> // maybePut should also be resolved before the write happens to ensure
>>> that it can return true/false truthfully if the state was empty
>>>
>>> #4
>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>> // in a future bundle for same key and window
>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>> "valueA"
>>>
>>> I think #1, #3, and #4 don't currently work and #2 doesn't work if the
>>> mapState didn't contain keyA
>>>
>>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org>
>>> wrote:
>>>
>>>> Can the change be observed in that case? I think the semantics should
>>>> be that the timing of the actual write cannot be observed.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> Yeah, the "or state is committed" means that we should resolve it
>>>>> before any additional writes but that only addresses part of the surprise I
>>>>> had.
>>>>>
>>>>> I would have expected that the putIfAbsent would force a resolution
>>>>> before transitioning to process the next key and window pair regardless
>>>>> whether the read() was invoked or there were additional writes.
>>>>>
>>>>>
>>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I think it is an implementation bug. The javadoc for putIfAbsent says
>>>>>> "When {@code read()} is called on the result or state is committed, it
>>>>>> forces a read of the map and reconciliation with any pending modifications."
>>>>>>
>>>>>> My reading of this is that the value changes for anything that
>>>>>> happens after the call to putIfAbsent. It would be good to make this clear,
>>>>>> that any subsequent observation of this cell should observe the new value.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent
>>>>>>> semantics were very weird. I almost would have preferred not having those
>>>>>>> methods in MapState, even though they can be useful.
>>>>>>>
>>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> I was surprised to see the MapState API for
>>>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>>>> that is returned is resolved.
>>>>>>>>
>>>>>>>> For example:
>>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new
>>>>>>>> value maybe");
>>>>>>>> does nothing until
>>>>>>>> value.read();
>>>>>>>>
>>>>>>>> It would seem like an obvious mistake for developers to make to
>>>>>>>> forget to do the value.read() when coming from the Java Map API which
>>>>>>>> always performs the write.
>>>>>>>>
>>>>>>>

Re: MapState API

Posted by Brian Hulette <bh...@google.com>.
On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <ki...@google.com> wrote:

> As a side note, SEO needs some work. The first result for "mapstate beam"
> on Google is from version 2.2.0
>

See this dev@ proposal [1] and jira [2].

[1]
https://lists.apache.org/thread.html/rc23610839204f2bba1ea148885ef9f496a3087a47d7bb25662a46bc5%40%3Cdev.beam.apache.org%3E
[2] https://issues.apache.org/jira/browse/BEAM-12777


>
> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com> wrote:
>
>> This section of the comment [1] says that calling read() is required for
>> the condition to be evaluated.
>>
>> Example in the comment is:
>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>> mapState.remove(keyA);
>> maybePut.read(); // writes to map
>>
>>  Also, it states that it should be a deferred read-followed-by-write.
>> Deferred until read is called?
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>
>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> I believe that all our ReadableState calls are point in time snapshots
>>> of what the underlying state is plus any changes that happened within the
>>> bundle already (e.g. you can see your own writes/appends/clears assuming
>>> that the creation of the ReadableState happened after the mutation).
>>>
>>> Here are some example of what I think should happen:
>>>
>>> #1
>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>> "valueA"
>>>
>>> #2
>>> ReadableState<T> read = mapState.get(keyA);   // read should equal prior
>>> value
>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>>
>>> #3
>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>> // any calls to MapState put/remove for keyA can effectively drop the
>>> write that would be pending via maybePut
>>> // maybePut should also be resolved before the write happens to ensure
>>> that it can return true/false truthfully if the state was empty
>>>
>>> #4
>>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>>> // in a future bundle for same key and window
>>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>>> "valueA"
>>>
>>> I think #1, #3, and #4 don't currently work and #2 doesn't work if the
>>> mapState didn't contain keyA
>>>
>>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org>
>>> wrote:
>>>
>>>> Can the change be observed in that case? I think the semantics should
>>>> be that the timing of the actual write cannot be observed.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> Yeah, the "or state is committed" means that we should resolve it
>>>>> before any additional writes but that only addresses part of the surprise I
>>>>> had.
>>>>>
>>>>> I would have expected that the putIfAbsent would force a resolution
>>>>> before transitioning to process the next key and window pair regardless
>>>>> whether the read() was invoked or there were additional writes.
>>>>>
>>>>>
>>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I think it is an implementation bug. The javadoc for putIfAbsent says
>>>>>> "When {@code read()} is called on the result or state is committed, it
>>>>>> forces a read of the map and reconciliation with any pending modifications."
>>>>>>
>>>>>> My reading of this is that the value changes for anything that
>>>>>> happens after the call to putIfAbsent. It would be good to make this clear,
>>>>>> that any subsequent observation of this cell should observe the new value.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent
>>>>>>> semantics were very weird. I almost would have preferred not having those
>>>>>>> methods in MapState, even though they can be useful.
>>>>>>>
>>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> I was surprised to see the MapState API for
>>>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>>>> that is returned is resolved.
>>>>>>>>
>>>>>>>> For example:
>>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new
>>>>>>>> value maybe");
>>>>>>>> does nothing until
>>>>>>>> value.read();
>>>>>>>>
>>>>>>>> It would seem like an obvious mistake for developers to make to
>>>>>>>> forget to do the value.read() when coming from the Java Map API which
>>>>>>>> always performs the write.
>>>>>>>>
>>>>>>>

Re: MapState API

Posted by Kiley Sok <ki...@google.com>.
As a side note, SEO needs some work. The first result for "mapstate beam"
on Google is from version 2.2.0

On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <ki...@google.com> wrote:

> This section of the comment [1] says that calling read() is required for
> the condition to be evaluated.
>
> Example in the comment is:
> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
> mapState.remove(keyA);
> maybePut.read(); // writes to map
>
>  Also, it states that it should be a deferred read-followed-by-write.
> Deferred until read is called?
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>
> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <lc...@google.com> wrote:
>
>> I believe that all our ReadableState calls are point in time snapshots of
>> what the underlying state is plus any changes that happened within the
>> bundle already (e.g. you can see your own writes/appends/clears assuming
>> that the creation of the ReadableState happened after the mutation).
>>
>> Here are some example of what I think should happen:
>>
>> #1
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>> "valueA"
>>
>> #2
>> ReadableState<T> read = mapState.get(keyA);   // read should equal prior
>> value
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>
>> #3
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>> // any calls to MapState put/remove for keyA can effectively drop the
>> write that would be pending via maybePut
>> // maybePut should also be resolved before the write happens to ensure
>> that it can return true/false truthfully if the state was empty
>>
>> #4
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>> // in a future bundle for same key and window
>> ReadableState<T> read = mapState.get(keyA);   // value should equal
>> "valueA"
>>
>> I think #1, #3, and #4 don't currently work and #2 doesn't work if the
>> mapState didn't contain keyA
>>
>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Can the change be observed in that case? I think the semantics should be
>>> that the timing of the actual write cannot be observed.
>>>
>>> Kenn
>>>
>>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> Yeah, the "or state is committed" means that we should resolve it
>>>> before any additional writes but that only addresses part of the surprise I
>>>> had.
>>>>
>>>> I would have expected that the putIfAbsent would force a resolution
>>>> before transitioning to process the next key and window pair regardless
>>>> whether the read() was invoked or there were additional writes.
>>>>
>>>>
>>>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> I think it is an implementation bug. The javadoc for putIfAbsent says
>>>>> "When {@code read()} is called on the result or state is committed, it
>>>>> forces a read of the map and reconciliation with any pending modifications."
>>>>>
>>>>> My reading of this is that the value changes for anything that happens
>>>>> after the call to putIfAbsent. It would be good to make this clear, that
>>>>> any subsequent observation of this cell should observe the new value.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent
>>>>>> semantics were very weird. I almost would have preferred not having those
>>>>>> methods in MapState, even though they can be useful.
>>>>>>
>>>>>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> I was surprised to see the MapState API for
>>>>>>> computeIfAbsent/putIfAbsent only performs the write if the ReadableState
>>>>>>> that is returned is resolved.
>>>>>>>
>>>>>>> For example:
>>>>>>> ReadableState<String> value = mapState.putIfAbsent("key", "new value
>>>>>>> maybe");
>>>>>>> does nothing until
>>>>>>> value.read();
>>>>>>>
>>>>>>> It would seem like an obvious mistake for developers to make to
>>>>>>> forget to do the value.read() when coming from the Java Map API which
>>>>>>> always performs the write.
>>>>>>>
>>>>>>