You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Catlyn Kong <ca...@yelp.com> on 2020/08/03 21:41:46 UTC

Re: [External] Re: DISCUSS: Sorted MapState API

Hey folks,

Sry I'm late to this thread but this might be very helpful for the problem
we're dealing with. Do we have a design doc or a jira ticket I can follow?

Cheers,
Catlyn

On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <je...@seznam.cz> wrote:

> My questions were just an example. I fully agree there is a fundamental
> need for a sorted state (of some form, and I also think this links to
> efficient implementation of retrations) - I was reacting to Kenn's question
> about BIP. This one would be pretty nice example why it would be good to
> have such a "process" - not everything can be solved on ML and there are
> fundamental decisions that might need a closer attention.
> On 6/18/20 5:28 PM, Reuven Lax wrote:
>
> Jan - my proposal is exactly TimeSortedBagState (more accurately -
> TimeSortedListState), though I went a bit further and also proposed a way
> to have a dynamic number of tagged TimeSortedBagStates.
>
> You are correct that the runner doesn't really have to store the data time
> sorted - what's actually needed is the ability to fetch and remove
> timestamp ranges of data (though that does include fetching the entire
> list); TimeOrderedState is probably a more accurate name then
> TimeSortedState. I don't think we could get away with operations that only
> act on the smallest timestamp, however we could limit the API to only being
> able to fetch and remove prefixes of data (ordered by timestamp). However
> if we support prefixes, we might as well support arbitrary subranges.
>
> On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Big +1 for a BIP, as this might really help clarify all the pros and cons
>> of all possibilities. There seem to be questions that need answering and
>> motivating use cases - do we need sorted map state or can we solve our use
>> cases by something simpler - e.g. the mentioned TimeSortedBagState? Does
>> that really have to be time-sorted structure, or does it "only" have to
>> have operations that can efficiently find and remove element with smallest
>> timestamp (like a PriorityQueue)?
>>
>> Jan
>> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
>>
>> Zooming in from generic philosophy to be clear: adding time ordered
>> buffer to the Fn state API is *not* a shortcut.It has benefits that will
>> not be achieved by SDK-side implementation on top of either ordered or
>> unordered multimap. Are those benefits worth expanding the API? I don't
>> know.
>>
>> A change to allow a runner to have a specialized implementation for
>> time-buffered state would be one or more StateKey types, right? Reuven,
>> maybe put this and your Java API in a doc? A BIP? Seems like there's at
>> least the following to explore:
>>
>>  - how that Java API would map to an SDK-side implementation on top of
>> multimap state key
>>  - how that Java API would map to a new StateKey
>>  - whether there's actually more than one relevant implementation of that
>> StateKey
>>  - whether SDK-side implementation on some other state key would be
>> performant enough in all SDK languages (present and future)
>>
>> Zooming back out to generic philosophy: Proliferation of StateKey
>> types tuned by runners (which can very easily still share implementation)
>> is probably better than proliferation of complex SDK-side implementations
>> with varying completeness and performance.
>>
>> Kenn
>>
>> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com> wrote:
>>
>>> It might help for me to describe what I have in mind. I'm still
>>> proposing that we build multimap, just not a globally-sorted multimap.
>>>
>>> My previous proposal was that we provide a Multimap<Key, Value> state
>>> type, sorted by key. this would have two additional operations -
>>> multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey,
>>> endKey). The primary use case was timestamp sorting, but I felt that a
>>> sorted multimap provided a nice generalization - after all, you can simply
>>> key the multimap by timestamp to get timestamp sorting.
>>>
>>> This approach had some issues immediately that would take some work to
>>> solve. Since a multimap key can have any type and a runner will only be
>>> able to sort by encoded type, we would need to introduce a concept of
>>> order-preserving coders into Beam and plumb that through. Robert pointed
>>> out that even our existing standard coders for simple integral types don't
>>> preserve order, so there will likely be surprises here.
>>>
>>> My current proposal is for a multimap that is not sorted by key, but
>>> that can support.ordered values for a single key. Remember that a multimap
>>> maps K -> Iterable<V>, so this means that each individual Iterable<V> is
>>> ordered, but the keys have no specific order relative to each other. This
>>> is not too different from many multimap implementations where the keys are
>>> unordered, but the list of values for a single key at least has a stable
>>> order.
>>>
>>> The interface would look like this:
>>>
>>> public interface MultimapState<K, V> extends State {
>>>   // Add a value with a default timestamp.
>>>   void put(K key, V value);
>>>
>>>   // Add a timestamped value.
>>>   void put(K, key, TimestampedValue<V> value);
>>>
>>>   // Remove all values for a key.
>>>   void remove (K key);
>>>
>>>   // Remove all values for a key with timestamps within the specified
>>> range.
>>>   void removeRange(K key, Instant startTs, Instant endTs);
>>>
>>>   // Get an Iterable of values for V. The Iterable will be returned
>>> sorted by timestamp.
>>>   ReadableState<Iterable<TimestampedValue<V>>> get(K key);
>>>
>>>   // Get an Iterable of values for V in the specified range. The
>>> Iterable will be returned sorted by timestamp.
>>>   ReadableState<Iterable<TimestampedValue<V>>> getRange(K key, Instant
>>> startTs, Instant endTs);
>>>
>>>   ReadableState<Iterable<K>> keys();
>>>   ReadableState<Iterable<TimestampedValue<V>>> values();
>>>   ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries;
>>> }
>>>
>>> We can of course provide helper functions that allow using MultimapState
>>> without deailing with TimestampValue for users who only want a multimap and
>>> don't want sorting.
>>>
>>> I think many users will only need a single sorted list - not a full
>>> multimap. It's worth offering this as well, and we can simply build it on
>>> top of MultimapState. It will look like an extension of BagState
>>>
>>> public interface TimestampSortedListState<T> extends State {
>>>   void add(TimestampedValue<T> value);
>>>   Iterable<TimestampedValue<T>> read();
>>>   Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
>>> endTs);
>>>   void clearRange(Instant startTs, Instant endTs);
>>> }
>>>
>>>
>>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> The portability layer is meant to live across multiple versions of Beam
>>>> and I don't think it should be treated by doing the simple and useful thing
>>>> now since I believe it will lead to a proliferation of the API.
>>>>
>>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> I have thoughts on the subject of whether to have APIs just for the
>>>>> lowest-level building blocks versus having APIs for higher-level
>>>>> constructs. Specifically this applies to providing only unsorted multimap
>>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on
>>>>> time-ordered buffer; if it turns out to be easy to go all the way to sorted
>>>>> multimap that's nice-to-have; if it turns out to be easy to implement on
>>>>> top of unsorted map state that should probably be under the hood
>>>>>
>>>>> Reasons to build low-level multimap in the runner & fn api and layer
>>>>> higher-level things in the SDK:
>>>>>
>>>>>  - It is less implementation for runners if they have to only provide
>>>>> fewer lower-level building blocks like multimap state.
>>>>>  - There are many more runners than SDKs (and will be even more and
>>>>> more) so this saves overall.
>>>>>
>>>>> Reasons to build higher-level constructs directly in the runner and fn
>>>>> api:
>>>>>
>>>>>  - Having multiple higher-level state types may actually be less
>>>>> implementation than one complex state type, especially if they map to
>>>>> runner primitives.
>>>>>  - The runner may have better specialized implementations, especially
>>>>> for something like a time-ordered buffer.
>>>>>  - The particular access patterns in an SDK-based implementation may
>>>>> not be ideal for each runner's underlying implementation of the low-level
>>>>> building block.
>>>>>  - There may be excessive gRPC overhead even for optimal access
>>>>> patterns.
>>>>>
>>>>> There are ways to have best of both worlds, like:
>>>>>
>>>>> 1. Define multiple state types according to fundamental access
>>>>> patterns, like we did this before portability.
>>>>> 2. If it is easy to layer one on top of the other, do that inside the
>>>>> runner. Provide shared code so for runners providing the lowest-level
>>>>> primitive they get all the types for free.
>>>>>
>>>>> I understand that this is an oversimplification. It still creates some
>>>>> more work. And APIs are a burden so it is good to introduce as few as
>>>>> possible for maintenance. But it has performance benefits and also unblocks
>>>>> "just doing the simple and useful thing now" which I always like to do as
>>>>> long as it is compatible with future changes. If the APIs are fundamental,
>>>>> like sets, maps, timestamp ordering, then it is safe to guess that they
>>>>> will change rarely and be useful forever.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I would be glad to take a stab at how to provide sorting on top of
>>>>>> unsorted multimap state.
>>>>>> Based upon your description, you want integer keys representing
>>>>>> timestamps and arbitrary user value for the values, is that correct?
>>>>>> What kinds of operations do you need on the sorted map state in order
>>>>>> of efficiency requirements?
>>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y))
>>>>>> What kinds of operations do we expect the underlying unsorted map
>>>>>> state to be able to provide?
>>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
>>>>>> enumerate(K)?)
>>>>>>
>>>>>> I went through a similar exercise of how to provide a list like side
>>>>>> input view over a multimap[1] side input which efficiently allowed
>>>>>> computation of size and provided random access while only having access to
>>>>>> get(K) and enumerate K's.
>>>>>>
>>>>>> 1:
>>>>>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
>>>>>>
>>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Bringing this subject up again,
>>>>>>>
>>>>>>> I've spent some time looking into implementing this for the Dataflow
>>>>>>> runner. I'm unable to find a way to implement the arbitrary sorted multimap
>>>>>>> efficiently for the case where there are large numbers of unique keys.
>>>>>>> Since the primary driving use case is timestamp ordering (i.e. key is event
>>>>>>> timestamp), you would expect to have nearly a new key per element. I
>>>>>>> considered Luke's suggestion above, but unfortunately it doesn't really
>>>>>>> solve this issue.
>>>>>>>
>>>>>>> The primary use case for sorting always seems to be sorting by
>>>>>>> timestamp. I want to propose that instead of building the fully-general
>>>>>>> sorted multimap, we instead focus on a state type where the sort key is an
>>>>>>> integral type (like a timestamp or an integer). There is still a valid use
>>>>>>> case for multimap, but we can provide that as an unordered state. At least
>>>>>>> for Dataflow, it will be much easier
>>>>>>>
>>>>>>> While my difficulties here may be specific to the Dataflow runner,
>>>>>>> any such support would have to be built into other runners as well, and
>>>>>>> limiting to integral sorting likely makes it easier for other runners to
>>>>>>> implement this. Also, if you look at this
>>>>>>> <https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95> Flink
>>>>>>> comment pointed out by Aljoscha, for Flink the main use case identified was
>>>>>>> also timestamp sorting. This will also simplify the API design for this
>>>>>>> feature: Sorted multimap with arbitrary keys would require us to introduce
>>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a new
>>>>>>> OrderPreservingCoder), but if we limit sort keys to integral types, the API
>>>>>>> design is simpler as integral types can be represented directly.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> This sounds to me like a potential runner strategy. However if a
>>>>>>>> runner can natively support sorted maps (e.g. we expect the Dataflow runner
>>>>>>>> to be able to do so, and I think it would be useful for other runners as
>>>>>>>> well), then it's probably preferable to allow the runner to use its native
>>>>>>>> capabilities.
>>>>>>>>
>>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> For the API that you proposed, the map key is always "void" and
>>>>>>>>> the sort key == user key. So in my example of
>>>>>>>>> key: dummy value
>>>>>>>>> key.000: token, (0001, value4)
>>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>>>>>>>>> key.01: token
>>>>>>>>> key.1: token, (1011, value3)
>>>>>>>>> you would have:
>>>>>>>>> "void": dummy value
>>>>>>>>> "void".000: token, (0001, value4)
>>>>>>>>> "void".001: token, (0010, value1), (0011, value2)
>>>>>>>>> "void".01: token
>>>>>>>>> "void".1: token, (1011, value3)
>>>>>>>>>
>>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking
>>>>>>>>> the the prefixes until you find a common prefix for K and then filter for
>>>>>>>>> values where they have a sort key <= K. Using the example above, to find
>>>>>>>>> entriesUntil(0010) you would:
>>>>>>>>> look for key."", miss
>>>>>>>>> look for key.0, miss
>>>>>>>>> look for key.00, miss
>>>>>>>>> look for key.000, hit, sort all contained values using secondary
>>>>>>>>> key, provide value4 to user
>>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we
>>>>>>>>> sort all contained values using secondary key, filter out value2 and
>>>>>>>>> provide value1
>>>>>>>>>
>>>>>>>>> void removeUntil(K limit) also translates into walking the
>>>>>>>>> prefixes but instead we will clear them when we have a "hit" with some
>>>>>>>>> special logic for when the sort key is a prefix of the key. Used the
>>>>>>>>> example, to removeUntil(0010) you would:
>>>>>>>>> look for key."", miss
>>>>>>>>> look for key.0, miss
>>>>>>>>> look for key.00, miss
>>>>>>>>> look for key.000, hit, clear
>>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we
>>>>>>>>> sort all contained values using secondary key, store in memory all values
>>>>>>>>> that > 0010, clear and append values stored in memory.
>>>>>>>>>
>>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Can you explain how fetching and deleting ranges of keys would
>>>>>>>>>> work with this data structure?
>>>>>>>>>>
>>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <lc...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Reuven, for the example, I assume that we never want to store
>>>>>>>>>>> more then 2 values at a given sort key prefix, and if we do then we will
>>>>>>>>>>> create a new longer prefix splitting up the values based upon the sort key.
>>>>>>>>>>>
>>>>>>>>>>> Tuple representation in examples below is (key, sort key, value)
>>>>>>>>>>> and . is a character outside of the alphabet which can be represented by
>>>>>>>>>>> using an escaping encoding that wraps the key + sort key encoding.
>>>>>>>>>>>
>>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the
>>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this case its 0, so we
>>>>>>>>>>> append value to the map at key.0 ending up with (we also set the key to any
>>>>>>>>>>> dummy value to know that it it contains values):
>>>>>>>>>>> key: dummy value
>>>>>>>>>>> key."": token, (0010, value1)
>>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + all
>>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to key."" ending up
>>>>>>>>>>> with:
>>>>>>>>>>> key: dummy value
>>>>>>>>>>> key."": token, (0010, value1), (0011, value2)
>>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + all
>>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is full, so we partition
>>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear the "" prefix
>>>>>>>>>>> ending up with:
>>>>>>>>>>> key: dummy value
>>>>>>>>>>> key.0: token, (0010, value1), (0011, value2)
>>>>>>>>>>> key.1: token, (1011, value3)
>>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + all
>>>>>>>>>>> the prefixes of the value finding 0 but notice that it is full, so we
>>>>>>>>>>> partition all the values into two prefixes 00 and 01 but notice this
>>>>>>>>>>> doesn't help us since 00 will be too full so we split 00 again to 000, 001.
>>>>>>>>>>> We also clear the 0 prefix ending up with:
>>>>>>>>>>> key: dummy value
>>>>>>>>>>> key.000: token, (0001, value4)
>>>>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>>>>>>>>>>> key.01: token
>>>>>>>>>>> key.1: token, (1011, value3)
>>>>>>>>>>>
>>>>>>>>>>> We are effectively building a trie[1] where we only have values
>>>>>>>>>>> at the leaves and control how full each leaf can be. There are other trie
>>>>>>>>>>> representations like a radix tree that may be better.
>>>>>>>>>>>
>>>>>>>>>>> Looking up the values in sorted order for "key" would go like
>>>>>>>>>>> this:
>>>>>>>>>>> Is key set, yes
>>>>>>>>>>> look for key."", miss
>>>>>>>>>>> look for key.0, miss
>>>>>>>>>>> look for key.00, miss
>>>>>>>>>>> look for key.000, hit, sort all contained values using secondary
>>>>>>>>>>> key, provide value4 to user
>>>>>>>>>>> look for key.001, hit, sort all contained values using secondary
>>>>>>>>>>> key, provide value1 followed by value2 to user
>>>>>>>>>>> look for key.01, hit, empty, return no values to user
>>>>>>>>>>> look for key.1, hit, sort all contained values using secondary
>>>>>>>>>>> key, provide value3 to user
>>>>>>>>>>> we have walked the entire prefix space, signal end of iterable
>>>>>>>>>>>
>>>>>>>>>>> Some notes for the above:
>>>>>>>>>>> * The dummy value is used to know that the key contains values
>>>>>>>>>>> and the token is to know whether there are any values deeper in the trie so
>>>>>>>>>>> when we know when to stop searching.
>>>>>>>>>>> * If we can recalculate the sort key from the combination of the
>>>>>>>>>>> key and value, then we don't need to store it.
>>>>>>>>>>> * Keys with lots of values will perform worse then keys with
>>>>>>>>>>> less values since we have to look up more keys but they will be empty
>>>>>>>>>>> reads. The number of misses can be controlled by how many elements we are
>>>>>>>>>>> willing to store at a given node before we subdivide.
>>>>>>>>>>>
>>>>>>>>>>> In reality you could build a lot of structures (e.g. red black
>>>>>>>>>>> tree, binary tree) using the sort key, the issue is the cost of
>>>>>>>>>>> rebalancing/re-organizing the structure in map form and whether it has a
>>>>>>>>>>> convenient pre-order traversal for lookups.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Some great comments!
>>>>>>>>>>>>
>>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by
>>>>>>>>>>>> runners to be efficient. We can of course provide a default (inefficient)
>>>>>>>>>>>> implementation, but ideally runners would provide better ones.
>>>>>>>>>>>>
>>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by
>>>>>>>>>>>> this. E.g.
>>>>>>>>>>>>
>>>>>>>>>>>> *Robert* Great point about standard coders not satisfying
>>>>>>>>>>>> this. That's why I suggested that we provide a way to tag the coders that
>>>>>>>>>>>> do preserve order, and only accept those as key coders Alternatively we
>>>>>>>>>>>> could present a more limited API - e.g. only allowing a hard-coded set of
>>>>>>>>>>>> types to be used as keys - but that seems counter to the direction Beam
>>>>>>>>>>>> usually goes. So users will have two ways .of creating multimap state specs:
>>>>>>>>>>>>
>>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>> state =
>>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
>>>>>>>>>>>>
>>>>>>>>>>>> or
>>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>> state =
>>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());
>>>>>>>>>>>>
>>>>>>>>>>>> The second one will validate that the key coder preserves
>>>>>>>>>>>> order, and fails otherwise (similar to coder determinism checking in
>>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these functions that use
>>>>>>>>>>>> coder inference to "guess" the coder, but those will do the same checking)
>>>>>>>>>>>>
>>>>>>>>>>>> Also the API I proposed did support random access! We could
>>>>>>>>>>>> separate out OrderedBagState again if we think the use cases are
>>>>>>>>>>>> fundamentally different. I merged the proposal into that of MultimapState
>>>>>>>>>>>> because there seemed be 99% overlap.
>>>>>>>>>>>>
>>>>>>>>>>>> Reuven
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <
>>>>>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <re...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <
>>>>>>>>>>>>> altay@google.com> wrote:
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <
>>>>>>>>>>>>> lcwik@google.com> wrote:
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <
>>>>>>>>>>>>> ruwang@google.com> wrote:
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> A few obvious problems with this code:
>>>>>>>>>>>>> >>>>>   1. Removing the elements already processed from the
>>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This is O(n^2) in the
>>>>>>>>>>>>> number of input trades.
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade
>>>>>>>>>>>>> state?
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State
>>>>>>>>>>>>> {
>>>>>>>>>>>>> >>>>>   // Add a value to the map.
>>>>>>>>>>>>> >>>>>   void put(K key, V value);
>>>>>>>>>>>>> >>>>>   // Get all values for a given key.
>>>>>>>>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
>>>>>>>>>>>>> >>>>>  // Return all entries in the map.
>>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
>>>>>>>>>>>>> >>>>>   // Return all entries in the map with keys <= limit.
>>>>>>>>>>>>> returned elements are sorted by the key.
>>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K
>>>>>>>>>>>>> limit);
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>  // Remove all values with the given key;
>>>>>>>>>>>>> >>>>>   void remove(K key);
>>>>>>>>>>>>> >>>>>  // Remove all entries in the map with keys <= limit.
>>>>>>>>>>>>> >>>>>   void removeUntil(K limit);
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove
>>>>>>>>>>>>> all entries in the map with keys < limit.
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of the key.
>>>>>>>>>>>>> In order to make this easier for users, I propose that we introduce a new
>>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this tag guarantees
>>>>>>>>>>>>> that the encoded value preserves the same ordering as the base Java type.
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Could you clarify what is  "encoded value preserves the
>>>>>>>>>>>>> same ordering as the base Java type"?
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> Lets say A and B represent two different instances of the
>>>>>>>>>>>>> same Java type like a double, then A < B (using the languages comparison
>>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions are compared
>>>>>>>>>>>>> lexicographically)
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff
>>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we support? What happens A,
>>>>>>>>>>>>> B sort differently in different languages?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > That would have to be the property of the coder (which means
>>>>>>>>>>>>> that this property probably needs to be represented in the portability
>>>>>>>>>>>>> representation of the coder). I imagine the common use cases will be for
>>>>>>>>>>>>> simple coders like int, long, string, etc., which are likely to sort the
>>>>>>>>>>>>> same in most languages.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The standard coders for both double and integral types do not
>>>>>>>>>>>>> respect
>>>>>>>>>>>>> the natural ordering (consider negative values). KV coders
>>>>>>>>>>>>> violate the
>>>>>>>>>>>>> "natural" lexicographic ordering on components as well. I think
>>>>>>>>>>>>> implicitly sorting on encoded value would yield many
>>>>>>>>>>>>> surprises. (The
>>>>>>>>>>>>> state, of course, could take a order-preserving, bytes
>>>>>>>>>>>>> (string?)-producing callable as a parameter of course). (As for
>>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or something
>>>>>>>>>>>>> like
>>>>>>>>>>>>> that...rather than Map which tends to imply random access.)
>>>>>>>>>>>>>
>>>>>>>>>>>>

Re: [External] Re: DISCUSS: Sorted MapState API

Posted by Tyson Hamilton <ty...@google.com>.
On Mon, Sep 28, 2020 at 9:24 AM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Wed, Sep 16, 2020 at 8:48 AM Tyson Hamilton <ty...@google.com> wrote:
>
>> The use case is to support an unbounded stream-stream join, where the
>> elements are arriving in roughly time sorted order. Removing a specific
>> element from the timestamp indexed collection is necessary when a match is
>> found.
>>
>
> Just checking - this is an optimization when you already know that the
> join is 1:1?
>

Yes, I suppose my current implementation of a stream-stream join assumes
1:1 joins. If there is a case for other cardinalities we should have a
discussion about this in more detail (in another thread).


> Kenn
>
>
>> Having clearRange is helpful to expire elements that are no longer
>> relevant according to a user-provided time based join predicate (e.g. WHEN
>> ABS(leftElement.timestamp - rightElement.timestamp) < 5 minutes).
>>
>> I'll think a bit more on how to use MapState instead if having a remove()
>> like method for a single element isn't an option.
>>
>> On Tue, Sep 15, 2020 at 8:52 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Hi,
>>>
>>> Currently we only support removing a timestamp range. You can remove a
>>> single timestamp of course by removing [ts, ts+1), however if there are
>>> multiple elements with the same timestamp this will remove all of those
>>> elements.
>>>
>>> Does this fit your use case? If not, I wonder if MapState is closer to
>>> what you are looking for?
>>>
>>> Reuven
>>>
>>> On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton <ty...@google.com>
>>> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> I noticed that there was an implementation of the in-memory
>>>> OrderedListState introduced [1]. Where can I find out more regarding the
>>>> plan and design? Is there a design doc? I'd like to know more details about
>>>> the implementation to see if it fits my use case. I was hoping it would
>>>> have a remove(TimestampedValue<T> e) method.
>>>>
>>>> Thanks,
>>>> -Tyson
>>>>
>>>>
>>>> [1]:
>>>> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41
>>>>
>>>>
>>>> On 2020/08/03 21:41:46, Catlyn Kong <ca...@yelp.com> wrote:
>>>> > Hey folks,
>>>> >
>>>> > Sry I'm late to this thread but this might be very helpful for the
>>>> problem
>>>> > we're dealing with. Do we have a design doc or a jira ticket I can
>>>> follow?
>>>> >
>>>> > Cheers,
>>>> > Catlyn
>>>> >
>>>> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>> >
>>>> > > My questions were just an example. I fully agree there is a
>>>> fundamental
>>>> > > need for a sorted state (of some form, and I also think this links
>>>> to
>>>> > > efficient implementation of retrations) - I was reacting to Kenn's
>>>> question
>>>> > > about BIP. This one would be pretty nice example why it would be
>>>> good to
>>>> > > have such a "process" - not everything can be solved on ML and
>>>> there are
>>>> > > fundamental decisions that might need a closer attention.
>>>> > > On 6/18/20 5:28 PM, Reuven Lax wrote:
>>>> > >
>>>> > > Jan - my proposal is exactly TimeSortedBagState (more accurately -
>>>> > > TimeSortedListState), though I went a bit further and also proposed
>>>> a way
>>>> > > to have a dynamic number of tagged TimeSortedBagStates.
>>>> > >
>>>> > > You are correct that the runner doesn't really have to store the
>>>> data time
>>>> > > sorted - what's actually needed is the ability to fetch and remove
>>>> > > timestamp ranges of data (though that does include fetching the
>>>> entire
>>>> > > list); TimeOrderedState is probably a more accurate name then
>>>> > > TimeSortedState. I don't think we could get away with operations
>>>> that only
>>>> > > act on the smallest timestamp, however we could limit the API to
>>>> only being
>>>> > > able to fetch and remove prefixes of data (ordered by timestamp).
>>>> However
>>>> > > if we support prefixes, we might as well support arbitrary
>>>> subranges.
>>>> > >
>>>> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <je...@seznam.cz>
>>>> wrote:
>>>> > >
>>>> > >> Big +1 for a BIP, as this might really help clarify all the pros
>>>> and cons
>>>> > >> of all possibilities. There seem to be questions that need
>>>> answering and
>>>> > >> motivating use cases - do we need sorted map state or can we solve
>>>> our use
>>>> > >> cases by something simpler - e.g. the mentioned
>>>> TimeSortedBagState? Does
>>>> > >> that really have to be time-sorted structure, or does it "only"
>>>> have to
>>>> > >> have operations that can efficiently find and remove element with
>>>> smallest
>>>> > >> timestamp (like a PriorityQueue)?
>>>> > >>
>>>> > >> Jan
>>>> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
>>>> > >>
>>>> > >> Zooming in from generic philosophy to be clear: adding time ordered
>>>> > >> buffer to the Fn state API is *not* a shortcut.It has benefits
>>>> that will
>>>> > >> not be achieved by SDK-side implementation on top of either
>>>> ordered or
>>>> > >> unordered multimap. Are those benefits worth expanding the API? I
>>>> don't
>>>> > >> know.
>>>> > >>
>>>> > >> A change to allow a runner to have a specialized implementation for
>>>> > >> time-buffered state would be one or more StateKey types, right?
>>>> Reuven,
>>>> > >> maybe put this and your Java API in a doc? A BIP? Seems like
>>>> there's at
>>>> > >> least the following to explore:
>>>> > >>
>>>> > >>  - how that Java API would map to an SDK-side implementation on
>>>> top of
>>>> > >> multimap state key
>>>> > >>  - how that Java API would map to a new StateKey
>>>> > >>  - whether there's actually more than one relevant implementation
>>>> of that
>>>> > >> StateKey
>>>> > >>  - whether SDK-side implementation on some other state key would be
>>>> > >> performant enough in all SDK languages (present and future)
>>>> > >>
>>>> > >> Zooming back out to generic philosophy: Proliferation of StateKey
>>>> > >> types tuned by runners (which can very easily still share
>>>> implementation)
>>>> > >> is probably better than proliferation of complex SDK-side
>>>> implementations
>>>> > >> with varying completeness and performance.
>>>> > >>
>>>> > >> Kenn
>>>> > >>
>>>> > >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com>
>>>> wrote:
>>>> > >>
>>>> > >>> It might help for me to describe what I have in mind. I'm still
>>>> > >>> proposing that we build multimap, just not a globally-sorted
>>>> multimap.
>>>> > >>>
>>>> > >>> My previous proposal was that we provide a Multimap<Key, Value>
>>>> state
>>>> > >>> type, sorted by key. this would have two additional operations -
>>>> > >>> multimap.getRange(startKey, endKey) and
>>>> multimap.deleteRange(startKey,
>>>> > >>> endKey). The primary use case was timestamp sorting, but I felt
>>>> that a
>>>> > >>> sorted multimap provided a nice generalization - after all, you
>>>> can simply
>>>> > >>> key the multimap by timestamp to get timestamp sorting.
>>>> > >>>
>>>> > >>> This approach had some issues immediately that would take some
>>>> work to
>>>> > >>> solve. Since a multimap key can have any type and a runner will
>>>> only be
>>>> > >>> able to sort by encoded type, we would need to introduce a
>>>> concept of
>>>> > >>> order-preserving coders into Beam and plumb that through. Robert
>>>> pointed
>>>> > >>> out that even our existing standard coders for simple integral
>>>> types don't
>>>> > >>> preserve order, so there will likely be surprises here.
>>>> > >>>
>>>> > >>> My current proposal is for a multimap that is not sorted by key,
>>>> but
>>>> > >>> that can support.ordered values for a single key. Remember that a
>>>> multimap
>>>> > >>> maps K -> Iterable<V>, so this means that each individual
>>>> Iterable<V> is
>>>> > >>> ordered, but the keys have no specific order relative to each
>>>> other. This
>>>> > >>> is not too different from many multimap implementations where the
>>>> keys are
>>>> > >>> unordered, but the list of values for a single key at least has a
>>>> stable
>>>> > >>> order.
>>>> > >>>
>>>> > >>> The interface would look like this:
>>>> > >>>
>>>> > >>> public interface MultimapState<K, V> extends State {
>>>> > >>>   // Add a value with a default timestamp.
>>>> > >>>   void put(K key, V value);
>>>> > >>>
>>>> > >>>   // Add a timestamped value.
>>>> > >>>   void put(K, key, TimestampedValue<V> value);
>>>> > >>>
>>>> > >>>   // Remove all values for a key.
>>>> > >>>   void remove (K key);
>>>> > >>>
>>>> > >>>   // Remove all values for a key with timestamps within the
>>>> specified
>>>> > >>> range.
>>>> > >>>   void removeRange(K key, Instant startTs, Instant endTs);
>>>> > >>>
>>>> > >>>   // Get an Iterable of values for V. The Iterable will be
>>>> returned
>>>> > >>> sorted by timestamp.
>>>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> get(K key);
>>>> > >>>
>>>> > >>>   // Get an Iterable of values for V in the specified range. The
>>>> > >>> Iterable will be returned sorted by timestamp.
>>>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> getRange(K key,
>>>> Instant
>>>> > >>> startTs, Instant endTs);
>>>> > >>>
>>>> > >>>   ReadableState<Iterable<K>> keys();
>>>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> values();
>>>> > >>>   ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>>
>>>> entries;
>>>> > >>> }
>>>> > >>>
>>>> > >>> We can of course provide helper functions that allow using
>>>> MultimapState
>>>> > >>> without deailing with TimestampValue for users who only want a
>>>> multimap and
>>>> > >>> don't want sorting.
>>>> > >>>
>>>> > >>> I think many users will only need a single sorted list - not a
>>>> full
>>>> > >>> multimap. It's worth offering this as well, and we can simply
>>>> build it on
>>>> > >>> top of MultimapState. It will look like an extension of BagState
>>>> > >>>
>>>> > >>> public interface TimestampSortedListState<T> extends State {
>>>> > >>>   void add(TimestampedValue<T> value);
>>>> > >>>   Iterable<TimestampedValue<T>> read();
>>>> > >>>   Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
>>>> > >>> endTs);
>>>> > >>>   void clearRange(Instant startTs, Instant endTs);
>>>> > >>> }
>>>> > >>>
>>>> > >>>
>>>> > >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com>
>>>> wrote:
>>>> > >>>
>>>> > >>>> The portability layer is meant to live across multiple versions
>>>> of Beam
>>>> > >>>> and I don't think it should be treated by doing the simple and
>>>> useful thing
>>>> > >>>> now since I believe it will lead to a proliferation of the API.
>>>> > >>>>
>>>> > >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <kenn@apache.org
>>>> >
>>>> > >>>> wrote:
>>>> > >>>>
>>>> > >>>>> I have thoughts on the subject of whether to have APIs just for
>>>> the
>>>> > >>>>> lowest-level building blocks versus having APIs for higher-level
>>>> > >>>>> constructs. Specifically this applies to providing only
>>>> unsorted multimap
>>>> > >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to
>>>> focus on
>>>> > >>>>> time-ordered buffer; if it turns out to be easy to go all the
>>>> way to sorted
>>>> > >>>>> multimap that's nice-to-have; if it turns out to be easy to
>>>> implement on
>>>> > >>>>> top of unsorted map state that should probably be under the hood
>>>> > >>>>>
>>>> > >>>>> Reasons to build low-level multimap in the runner & fn api and
>>>> layer
>>>> > >>>>> higher-level things in the SDK:
>>>> > >>>>>
>>>> > >>>>>  - It is less implementation for runners if they have to only
>>>> provide
>>>> > >>>>> fewer lower-level building blocks like multimap state.
>>>> > >>>>>  - There are many more runners than SDKs (and will be even more
>>>> and
>>>> > >>>>> more) so this saves overall.
>>>> > >>>>>
>>>> > >>>>> Reasons to build higher-level constructs directly in the runner
>>>> and fn
>>>> > >>>>> api:
>>>> > >>>>>
>>>> > >>>>>  - Having multiple higher-level state types may actually be less
>>>> > >>>>> implementation than one complex state type, especially if they
>>>> map to
>>>> > >>>>> runner primitives.
>>>> > >>>>>  - The runner may have better specialized implementations,
>>>> especially
>>>> > >>>>> for something like a time-ordered buffer.
>>>> > >>>>>  - The particular access patterns in an SDK-based
>>>> implementation may
>>>> > >>>>> not be ideal for each runner's underlying implementation of the
>>>> low-level
>>>> > >>>>> building block.
>>>> > >>>>>  - There may be excessive gRPC overhead even for optimal access
>>>> > >>>>> patterns.
>>>> > >>>>>
>>>> > >>>>> There are ways to have best of both worlds, like:
>>>> > >>>>>
>>>> > >>>>> 1. Define multiple state types according to fundamental access
>>>> > >>>>> patterns, like we did this before portability.
>>>> > >>>>> 2. If it is easy to layer one on top of the other, do that
>>>> inside the
>>>> > >>>>> runner. Provide shared code so for runners providing the
>>>> lowest-level
>>>> > >>>>> primitive they get all the types for free.
>>>> > >>>>>
>>>> > >>>>> I understand that this is an oversimplification. It still
>>>> creates some
>>>> > >>>>> more work. And APIs are a burden so it is good to introduce as
>>>> few as
>>>> > >>>>> possible for maintenance. But it has performance benefits and
>>>> also unblocks
>>>> > >>>>> "just doing the simple and useful thing now" which I always
>>>> like to do as
>>>> > >>>>> long as it is compatible with future changes. If the APIs are
>>>> fundamental,
>>>> > >>>>> like sets, maps, timestamp ordering, then it is safe to guess
>>>> that they
>>>> > >>>>> will change rarely and be useful forever.
>>>> > >>>>>
>>>> > >>>>> Kenn
>>>> > >>>>>
>>>> > >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com>
>>>> wrote:
>>>> > >>>>>
>>>> > >>>>>> I would be glad to take a stab at how to provide sorting on
>>>> top of
>>>> > >>>>>> unsorted multimap state.
>>>> > >>>>>> Based upon your description, you want integer keys representing
>>>> > >>>>>> timestamps and arbitrary user value for the values, is that
>>>> correct?
>>>> > >>>>>> What kinds of operations do you need on the sorted map state
>>>> in order
>>>> > >>>>>> of efficiency requirements?
>>>> > >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)),
>>>> ClearAll(Range[x, y))
>>>> > >>>>>> What kinds of operations do we expect the underlying unsorted
>>>> map
>>>> > >>>>>> state to be able to provide?
>>>> > >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
>>>> > >>>>>> enumerate(K)?)
>>>> > >>>>>>
>>>> > >>>>>> I went through a similar exercise of how to provide a list
>>>> like side
>>>> > >>>>>> input view over a multimap[1] side input which efficiently
>>>> allowed
>>>> > >>>>>> computation of size and provided random access while only
>>>> having access to
>>>> > >>>>>> get(K) and enumerate K's.
>>>> > >>>>>>
>>>> > >>>>>> 1:
>>>> > >>>>>>
>>>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
>>>> > >>>>>>
>>>> > >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com>
>>>> wrote:
>>>> > >>>>>>
>>>> > >>>>>>> Bringing this subject up again,
>>>> > >>>>>>>
>>>> > >>>>>>> I've spent some time looking into implementing this for the
>>>> Dataflow
>>>> > >>>>>>> runner. I'm unable to find a way to implement the arbitrary
>>>> sorted multimap
>>>> > >>>>>>> efficiently for the case where there are large numbers of
>>>> unique keys.
>>>> > >>>>>>> Since the primary driving use case is timestamp ordering
>>>> (i.e. key is event
>>>> > >>>>>>> timestamp), you would expect to have nearly a new key per
>>>> element. I
>>>> > >>>>>>> considered Luke's suggestion above, but unfortunately it
>>>> doesn't really
>>>> > >>>>>>> solve this issue.
>>>> > >>>>>>>
>>>> > >>>>>>> The primary use case for sorting always seems to be sorting by
>>>> > >>>>>>> timestamp. I want to propose that instead of building the
>>>> fully-general
>>>> > >>>>>>> sorted multimap, we instead focus on a state type where the
>>>> sort key is an
>>>> > >>>>>>> integral type (like a timestamp or an integer). There is
>>>> still a valid use
>>>> > >>>>>>> case for multimap, but we can provide that as an unordered
>>>> state. At least
>>>> > >>>>>>> for Dataflow, it will be much easier
>>>> > >>>>>>>
>>>> > >>>>>>> While my difficulties here may be specific to the Dataflow
>>>> runner,
>>>> > >>>>>>> any such support would have to be built into other runners as
>>>> well, and
>>>> > >>>>>>> limiting to integral sorting likely makes it easier for other
>>>> runners to
>>>> > >>>>>>> implement this. Also, if you look at this
>>>> > >>>>>>> <
>>>> https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
>>>> Flink
>>>> > >>>>>>> comment pointed out by Aljoscha, for Flink the main use case
>>>> identified was
>>>> > >>>>>>> also timestamp sorting. This will also simplify the API
>>>> design for this
>>>> > >>>>>>> feature: Sorted multimap with arbitrary keys would require us
>>>> to introduce
>>>> > >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a
>>>> new
>>>> > >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral
>>>> types, the API
>>>> > >>>>>>> design is simpler as integral types can be represented
>>>> directly.
>>>> > >>>>>>>
>>>> > >>>>>>> Reuven
>>>> > >>>>>>>
>>>> > >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com>
>>>> wrote:
>>>> > >>>>>>>
>>>> > >>>>>>>> This sounds to me like a potential runner strategy. However
>>>> if a
>>>> > >>>>>>>> runner can natively support sorted maps (e.g. we expect the
>>>> Dataflow runner
>>>> > >>>>>>>> to be able to do so, and I think it would be useful for
>>>> other runners as
>>>> > >>>>>>>> well), then it's probably preferable to allow the runner to
>>>> use its native
>>>> > >>>>>>>> capabilities.
>>>> > >>>>>>>>
>>>> > >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <
>>>> lcwik@google.com>
>>>> > >>>>>>>> wrote:
>>>> > >>>>>>>>
>>>> > >>>>>>>>> For the API that you proposed, the map key is always "void"
>>>> and
>>>> > >>>>>>>>> the sort key == user key. So in my example of
>>>> > >>>>>>>>> key: dummy value
>>>> > >>>>>>>>> key.000: token, (0001, value4)
>>>> > >>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>>>> > >>>>>>>>> key.01: token
>>>> > >>>>>>>>> key.1: token, (1011, value3)
>>>> > >>>>>>>>> you would have:
>>>> > >>>>>>>>> "void": dummy value
>>>> > >>>>>>>>> "void".000: token, (0001, value4)
>>>> > >>>>>>>>> "void".001: token, (0010, value1), (0011, value2)
>>>> > >>>>>>>>> "void".01: token
>>>> > >>>>>>>>> "void".1: token, (1011, value3)
>>>> > >>>>>>>>>
>>>> > >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into
>>>> walking
>>>> > >>>>>>>>> the the prefixes until you find a common prefix for K and
>>>> then filter for
>>>> > >>>>>>>>> values where they have a sort key <= K. Using the example
>>>> above, to find
>>>> > >>>>>>>>> entriesUntil(0010) you would:
>>>> > >>>>>>>>> look for key."", miss
>>>> > >>>>>>>>> look for key.0, miss
>>>> > >>>>>>>>> look for key.00, miss
>>>> > >>>>>>>>> look for key.000, hit, sort all contained values using
>>>> secondary
>>>> > >>>>>>>>> key, provide value4 to user
>>>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010
>>>> so we
>>>> > >>>>>>>>> sort all contained values using secondary key, filter out
>>>> value2 and
>>>> > >>>>>>>>> provide value1
>>>> > >>>>>>>>>
>>>> > >>>>>>>>> void removeUntil(K limit) also translates into walking the
>>>> > >>>>>>>>> prefixes but instead we will clear them when we have a
>>>> "hit" with some
>>>> > >>>>>>>>> special logic for when the sort key is a prefix of the key.
>>>> Used the
>>>> > >>>>>>>>> example, to removeUntil(0010) you would:
>>>> > >>>>>>>>> look for key."", miss
>>>> > >>>>>>>>> look for key.0, miss
>>>> > >>>>>>>>> look for key.00, miss
>>>> > >>>>>>>>> look for key.000, hit, clear
>>>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010
>>>> so we
>>>> > >>>>>>>>> sort all contained values using secondary key, store in
>>>> memory all values
>>>> > >>>>>>>>> that > 0010, clear and append values stored in memory.
>>>> > >>>>>>>>>
>>>> > >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <
>>>> relax@google.com>
>>>> > >>>>>>>>> wrote:
>>>> > >>>>>>>>>
>>>> > >>>>>>>>>> Can you explain how fetching and deleting ranges of keys
>>>> would
>>>> > >>>>>>>>>> work with this data structure?
>>>> > >>>>>>>>>>
>>>> > >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <
>>>> lcwik@google.com>
>>>> > >>>>>>>>>> wrote:
>>>> > >>>>>>>>>>
>>>> > >>>>>>>>>>> Reuven, for the example, I assume that we never want to
>>>> store
>>>> > >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we
>>>> do then we will
>>>> > >>>>>>>>>>> create a new longer prefix splitting up the values based
>>>> upon the sort key.
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>> Tuple representation in examples below is (key, sort key,
>>>> value)
>>>> > >>>>>>>>>>> and . is a character outside of the alphabet which can be
>>>> represented by
>>>> > >>>>>>>>>>> using an escaping encoding that wraps the key + sort key
>>>> encoding.
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the
>>>> > >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this
>>>> case its 0, so we
>>>> > >>>>>>>>>>> append value to the map at key.0 ending up with (we also
>>>> set the key to any
>>>> > >>>>>>>>>>> dummy value to know that it it contains values):
>>>> > >>>>>>>>>>> key: dummy value
>>>> > >>>>>>>>>>> key."": token, (0010, value1)
>>>> > >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key"
>>>> + all
>>>> > >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to
>>>> key."" ending up
>>>> > >>>>>>>>>>> with:
>>>> > >>>>>>>>>>> key: dummy value
>>>> > >>>>>>>>>>> key."": token, (0010, value1), (0011, value2)
>>>> > >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key"
>>>> + all
>>>> > >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is
>>>> full, so we partition
>>>> > >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear
>>>> the "" prefix
>>>> > >>>>>>>>>>> ending up with:
>>>> > >>>>>>>>>>> key: dummy value
>>>> > >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2)
>>>> > >>>>>>>>>>> key.1: token, (1011, value3)
>>>> > >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key"
>>>> + all
>>>> > >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is
>>>> full, so we
>>>> > >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but
>>>> notice this
>>>> > >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00
>>>> again to 000, 001.
>>>> > >>>>>>>>>>> We also clear the 0 prefix ending up with:
>>>> > >>>>>>>>>>> key: dummy value
>>>> > >>>>>>>>>>> key.000: token, (0001, value4)
>>>> > >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>>>> > >>>>>>>>>>> key.01: token
>>>> > >>>>>>>>>>> key.1: token, (1011, value3)
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>> We are effectively building a trie[1] where we only have
>>>> values
>>>> > >>>>>>>>>>> at the leaves and control how full each leaf can be.
>>>> There are other trie
>>>> > >>>>>>>>>>> representations like a radix tree that may be better.
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>> Looking up the values in sorted order for "key" would go
>>>> like
>>>> > >>>>>>>>>>> this:
>>>> > >>>>>>>>>>> Is key set, yes
>>>> > >>>>>>>>>>> look for key."", miss
>>>> > >>>>>>>>>>> look for key.0, miss
>>>> > >>>>>>>>>>> look for key.00, miss
>>>> > >>>>>>>>>>> look for key.000, hit, sort all contained values using
>>>> secondary
>>>> > >>>>>>>>>>> key, provide value4 to user
>>>> > >>>>>>>>>>> look for key.001, hit, sort all contained values using
>>>> secondary
>>>> > >>>>>>>>>>> key, provide value1 followed by value2 to user
>>>> > >>>>>>>>>>> look for key.01, hit, empty, return no values to user
>>>> > >>>>>>>>>>> look for key.1, hit, sort all contained values using
>>>> secondary
>>>> > >>>>>>>>>>> key, provide value3 to user
>>>> > >>>>>>>>>>> we have walked the entire prefix space, signal end of
>>>> iterable
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>> Some notes for the above:
>>>> > >>>>>>>>>>> * The dummy value is used to know that the key contains
>>>> values
>>>> > >>>>>>>>>>> and the token is to know whether there are any values
>>>> deeper in the trie so
>>>> > >>>>>>>>>>> when we know when to stop searching.
>>>> > >>>>>>>>>>> * If we can recalculate the sort key from the combination
>>>> of the
>>>> > >>>>>>>>>>> key and value, then we don't need to store it.
>>>> > >>>>>>>>>>> * Keys with lots of values will perform worse then keys
>>>> with
>>>> > >>>>>>>>>>> less values since we have to look up more keys but they
>>>> will be empty
>>>> > >>>>>>>>>>> reads. The number of misses can be controlled by how many
>>>> elements we are
>>>> > >>>>>>>>>>> willing to store at a given node before we subdivide.
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>> In reality you could build a lot of structures (e.g. red
>>>> black
>>>> > >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the
>>>> cost of
>>>> > >>>>>>>>>>> rebalancing/re-organizing the structure in map form and
>>>> whether it has a
>>>> > >>>>>>>>>>> convenient pre-order traversal for lookups.
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <
>>>> relax@google.com>
>>>> > >>>>>>>>>>> wrote:
>>>> > >>>>>>>>>>>
>>>> > >>>>>>>>>>>> Some great comments!
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented
>>>> by
>>>> > >>>>>>>>>>>> runners to be efficient. We can of course provide a
>>>> default (inefficient)
>>>> > >>>>>>>>>>>> implementation, but ideally runners would provide better
>>>> ones.
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed
>>>> by
>>>> > >>>>>>>>>>>> this. E.g.
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying
>>>> > >>>>>>>>>>>> this. That's why I suggested that we provide a way to
>>>> tag the coders that
>>>> > >>>>>>>>>>>> do preserve order, and only accept those as key coders
>>>> Alternatively we
>>>> > >>>>>>>>>>>> could present a more limited API - e.g. only allowing a
>>>> hard-coded set of
>>>> > >>>>>>>>>>>> types to be used as keys - but that seems counter to the
>>>> direction Beam
>>>> > >>>>>>>>>>>> usually goes. So users will have two ways .of creating
>>>> multimap state specs:
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
>>>> state =
>>>> > >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(),
>>>> StringUtf8Coder.of());
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>> or
>>>> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
>>>> state =
>>>> > >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(),
>>>> StringUtf8Coder.of());
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>> The second one will validate that the key coder preserves
>>>> > >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism
>>>> checking in
>>>> > >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these
>>>> functions that use
>>>> > >>>>>>>>>>>> coder inference to "guess" the coder, but those will do
>>>> the same checking)
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>> Also the API I proposed did support random access! We
>>>> could
>>>> > >>>>>>>>>>>> separate out OrderedBagState again if we think the use
>>>> cases are
>>>> > >>>>>>>>>>>> fundamentally different. I merged the proposal into that
>>>> of MultimapState
>>>> > >>>>>>>>>>>> because there seemed be 99% overlap.
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>> Reuven
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <
>>>> > >>>>>>>>>>>> robertwb@google.com> wrote:
>>>> > >>>>>>>>>>>>
>>>> > >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <
>>>> relax@google.com>
>>>> > >>>>>>>>>>>>> wrote:
>>>> > >>>>>>>>>>>>> >
>>>> > >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <
>>>> > >>>>>>>>>>>>> altay@google.com> wrote:
>>>> > >>>>>>>>>>>>> >>
>>>> > >>>>>>>>>>>>> >>
>>>> > >>>>>>>>>>>>> >>
>>>> > >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <
>>>> > >>>>>>>>>>>>> lcwik@google.com> wrote:
>>>> > >>>>>>>>>>>>> >>>
>>>> > >>>>>>>>>>>>> >>>
>>>> > >>>>>>>>>>>>> >>>
>>>> > >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <
>>>> > >>>>>>>>>>>>> ruwang@google.com> wrote:
>>>> > >>>>>>>>>>>>> >>>>>
>>>> > >>>>>>>>>>>>> >>>>> A few obvious problems with this code:
>>>> > >>>>>>>>>>>>> >>>>>   1. Removing the elements already processed from
>>>> the
>>>> > >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag.
>>>> This is O(n^2) in the
>>>> > >>>>>>>>>>>>> number of input trades.
>>>> > >>>>>>>>>>>>> >>>>
>>>> > >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting
>>>> trade
>>>> > >>>>>>>>>>>>> state?
>>>> > >>>>>>>>>>>>> >>>>
>>>> > >>>>>>>>>>>>> >>>>>
>>>> > >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V>
>>>> extends State
>>>> > >>>>>>>>>>>>> {
>>>> > >>>>>>>>>>>>> >>>>>   // Add a value to the map.
>>>> > >>>>>>>>>>>>> >>>>>   void put(K key, V value);
>>>> > >>>>>>>>>>>>> >>>>>   // Get all values for a given key.
>>>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
>>>> > >>>>>>>>>>>>> >>>>>  // Return all entries in the map.
>>>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
>>>> > >>>>>>>>>>>>> >>>>>   // Return all entries in the map with keys <=
>>>> limit.
>>>> > >>>>>>>>>>>>> returned elements are sorted by the key.
>>>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K
>>>> > >>>>>>>>>>>>> limit);
>>>> > >>>>>>>>>>>>> >>>>>
>>>> > >>>>>>>>>>>>> >>>>>  // Remove all values with the given key;
>>>> > >>>>>>>>>>>>> >>>>>   void remove(K key);
>>>> > >>>>>>>>>>>>> >>>>>  // Remove all entries in the map with keys <=
>>>> limit.
>>>> > >>>>>>>>>>>>> >>>>>   void removeUntil(K limit);
>>>> > >>>>>>>>>>>>> >>>>
>>>> > >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will
>>>> remove
>>>> > >>>>>>>>>>>>> all entries in the map with keys < limit.
>>>> > >>>>>>>>>>>>> >>>>
>>>> > >>>>>>>>>>>>> >>>>>
>>>> > >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of
>>>> the key.
>>>> > >>>>>>>>>>>>> In order to make this easier for users, I propose that
>>>> we introduce a new
>>>> > >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains
>>>> this tag guarantees
>>>> > >>>>>>>>>>>>> that the encoded value preserves the same ordering as
>>>> the base Java type.
>>>> > >>>>>>>>>>>>> >>>>
>>>> > >>>>>>>>>>>>> >>>>
>>>> > >>>>>>>>>>>>> >>>> Could you clarify what is  "encoded value
>>>> preserves the
>>>> > >>>>>>>>>>>>> same ordering as the base Java type"?
>>>> > >>>>>>>>>>>>> >>>
>>>> > >>>>>>>>>>>>> >>>
>>>> > >>>>>>>>>>>>> >>> Lets say A and B represent two different instances
>>>> of the
>>>> > >>>>>>>>>>>>> same Java type like a double, then A < B (using the
>>>> languages comparison
>>>> > >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded
>>>> versions are compared
>>>> > >>>>>>>>>>>>> lexicographically)
>>>> > >>>>>>>>>>>>> >>
>>>> > >>>>>>>>>>>>> >>
>>>> > >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A
>>>> < B iff
>>>> > >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we
>>>> support? What happens A,
>>>> > >>>>>>>>>>>>> B sort differently in different languages?
>>>> > >>>>>>>>>>>>> >
>>>> > >>>>>>>>>>>>> >
>>>> > >>>>>>>>>>>>> > That would have to be the property of the coder
>>>> (which means
>>>> > >>>>>>>>>>>>> that this property probably needs to be represented in
>>>> the portability
>>>> > >>>>>>>>>>>>> representation of the coder). I imagine the common use
>>>> cases will be for
>>>> > >>>>>>>>>>>>> simple coders like int, long, string, etc., which are
>>>> likely to sort the
>>>> > >>>>>>>>>>>>> same in most languages.
>>>> > >>>>>>>>>>>>>
>>>> > >>>>>>>>>>>>> The standard coders for both double and integral types
>>>> do not
>>>> > >>>>>>>>>>>>> respect
>>>> > >>>>>>>>>>>>> the natural ordering (consider negative values). KV
>>>> coders
>>>> > >>>>>>>>>>>>> violate the
>>>> > >>>>>>>>>>>>> "natural" lexicographic ordering on components as well.
>>>> I think
>>>> > >>>>>>>>>>>>> implicitly sorting on encoded value would yield many
>>>> > >>>>>>>>>>>>> surprises. (The
>>>> > >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes
>>>> > >>>>>>>>>>>>> (string?)-producing callable as a parameter of course).
>>>> (As for
>>>> > >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or
>>>> something
>>>> > >>>>>>>>>>>>> like
>>>> > >>>>>>>>>>>>> that...rather than Map which tends to imply random
>>>> access.)
>>>> > >>>>>>>>>>>>>
>>>> > >>>>>>>>>>>>
>>>> >
>>>>
>>>

Re: [External] Re: DISCUSS: Sorted MapState API

Posted by Kenneth Knowles <ke...@apache.org>.
On Wed, Sep 16, 2020 at 8:48 AM Tyson Hamilton <ty...@google.com> wrote:

> The use case is to support an unbounded stream-stream join, where the
> elements are arriving in roughly time sorted order. Removing a specific
> element from the timestamp indexed collection is necessary when a match is
> found.
>

Just checking - this is an optimization when you already know that the join
is 1:1?

Kenn


> Having clearRange is helpful to expire elements that are no longer
> relevant according to a user-provided time based join predicate (e.g. WHEN
> ABS(leftElement.timestamp - rightElement.timestamp) < 5 minutes).
>
> I'll think a bit more on how to use MapState instead if having a remove()
> like method for a single element isn't an option.
>
> On Tue, Sep 15, 2020 at 8:52 PM Reuven Lax <re...@google.com> wrote:
>
>> Hi,
>>
>> Currently we only support removing a timestamp range. You can remove a
>> single timestamp of course by removing [ts, ts+1), however if there are
>> multiple elements with the same timestamp this will remove all of those
>> elements.
>>
>> Does this fit your use case? If not, I wonder if MapState is closer to
>> what you are looking for?
>>
>> Reuven
>>
>> On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton <ty...@google.com>
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> I noticed that there was an implementation of the in-memory
>>> OrderedListState introduced [1]. Where can I find out more regarding the
>>> plan and design? Is there a design doc? I'd like to know more details about
>>> the implementation to see if it fits my use case. I was hoping it would
>>> have a remove(TimestampedValue<T> e) method.
>>>
>>> Thanks,
>>> -Tyson
>>>
>>>
>>> [1]:
>>> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41
>>>
>>>
>>> On 2020/08/03 21:41:46, Catlyn Kong <ca...@yelp.com> wrote:
>>> > Hey folks,
>>> >
>>> > Sry I'm late to this thread but this might be very helpful for the
>>> problem
>>> > we're dealing with. Do we have a design doc or a jira ticket I can
>>> follow?
>>> >
>>> > Cheers,
>>> > Catlyn
>>> >
>>> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <je...@seznam.cz> wrote:
>>> >
>>> > > My questions were just an example. I fully agree there is a
>>> fundamental
>>> > > need for a sorted state (of some form, and I also think this links to
>>> > > efficient implementation of retrations) - I was reacting to Kenn's
>>> question
>>> > > about BIP. This one would be pretty nice example why it would be
>>> good to
>>> > > have such a "process" - not everything can be solved on ML and there
>>> are
>>> > > fundamental decisions that might need a closer attention.
>>> > > On 6/18/20 5:28 PM, Reuven Lax wrote:
>>> > >
>>> > > Jan - my proposal is exactly TimeSortedBagState (more accurately -
>>> > > TimeSortedListState), though I went a bit further and also proposed
>>> a way
>>> > > to have a dynamic number of tagged TimeSortedBagStates.
>>> > >
>>> > > You are correct that the runner doesn't really have to store the
>>> data time
>>> > > sorted - what's actually needed is the ability to fetch and remove
>>> > > timestamp ranges of data (though that does include fetching the
>>> entire
>>> > > list); TimeOrderedState is probably a more accurate name then
>>> > > TimeSortedState. I don't think we could get away with operations
>>> that only
>>> > > act on the smallest timestamp, however we could limit the API to
>>> only being
>>> > > able to fetch and remove prefixes of data (ordered by timestamp).
>>> However
>>> > > if we support prefixes, we might as well support arbitrary subranges.
>>> > >
>>> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <je...@seznam.cz>
>>> wrote:
>>> > >
>>> > >> Big +1 for a BIP, as this might really help clarify all the pros
>>> and cons
>>> > >> of all possibilities. There seem to be questions that need
>>> answering and
>>> > >> motivating use cases - do we need sorted map state or can we solve
>>> our use
>>> > >> cases by something simpler - e.g. the mentioned TimeSortedBagState?
>>> Does
>>> > >> that really have to be time-sorted structure, or does it "only"
>>> have to
>>> > >> have operations that can efficiently find and remove element with
>>> smallest
>>> > >> timestamp (like a PriorityQueue)?
>>> > >>
>>> > >> Jan
>>> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
>>> > >>
>>> > >> Zooming in from generic philosophy to be clear: adding time ordered
>>> > >> buffer to the Fn state API is *not* a shortcut.It has benefits that
>>> will
>>> > >> not be achieved by SDK-side implementation on top of either ordered
>>> or
>>> > >> unordered multimap. Are those benefits worth expanding the API? I
>>> don't
>>> > >> know.
>>> > >>
>>> > >> A change to allow a runner to have a specialized implementation for
>>> > >> time-buffered state would be one or more StateKey types, right?
>>> Reuven,
>>> > >> maybe put this and your Java API in a doc? A BIP? Seems like
>>> there's at
>>> > >> least the following to explore:
>>> > >>
>>> > >>  - how that Java API would map to an SDK-side implementation on top
>>> of
>>> > >> multimap state key
>>> > >>  - how that Java API would map to a new StateKey
>>> > >>  - whether there's actually more than one relevant implementation
>>> of that
>>> > >> StateKey
>>> > >>  - whether SDK-side implementation on some other state key would be
>>> > >> performant enough in all SDK languages (present and future)
>>> > >>
>>> > >> Zooming back out to generic philosophy: Proliferation of StateKey
>>> > >> types tuned by runners (which can very easily still share
>>> implementation)
>>> > >> is probably better than proliferation of complex SDK-side
>>> implementations
>>> > >> with varying completeness and performance.
>>> > >>
>>> > >> Kenn
>>> > >>
>>> > >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com>
>>> wrote:
>>> > >>
>>> > >>> It might help for me to describe what I have in mind. I'm still
>>> > >>> proposing that we build multimap, just not a globally-sorted
>>> multimap.
>>> > >>>
>>> > >>> My previous proposal was that we provide a Multimap<Key, Value>
>>> state
>>> > >>> type, sorted by key. this would have two additional operations -
>>> > >>> multimap.getRange(startKey, endKey) and
>>> multimap.deleteRange(startKey,
>>> > >>> endKey). The primary use case was timestamp sorting, but I felt
>>> that a
>>> > >>> sorted multimap provided a nice generalization - after all, you
>>> can simply
>>> > >>> key the multimap by timestamp to get timestamp sorting.
>>> > >>>
>>> > >>> This approach had some issues immediately that would take some
>>> work to
>>> > >>> solve. Since a multimap key can have any type and a runner will
>>> only be
>>> > >>> able to sort by encoded type, we would need to introduce a concept
>>> of
>>> > >>> order-preserving coders into Beam and plumb that through. Robert
>>> pointed
>>> > >>> out that even our existing standard coders for simple integral
>>> types don't
>>> > >>> preserve order, so there will likely be surprises here.
>>> > >>>
>>> > >>> My current proposal is for a multimap that is not sorted by key,
>>> but
>>> > >>> that can support.ordered values for a single key. Remember that a
>>> multimap
>>> > >>> maps K -> Iterable<V>, so this means that each individual
>>> Iterable<V> is
>>> > >>> ordered, but the keys have no specific order relative to each
>>> other. This
>>> > >>> is not too different from many multimap implementations where the
>>> keys are
>>> > >>> unordered, but the list of values for a single key at least has a
>>> stable
>>> > >>> order.
>>> > >>>
>>> > >>> The interface would look like this:
>>> > >>>
>>> > >>> public interface MultimapState<K, V> extends State {
>>> > >>>   // Add a value with a default timestamp.
>>> > >>>   void put(K key, V value);
>>> > >>>
>>> > >>>   // Add a timestamped value.
>>> > >>>   void put(K, key, TimestampedValue<V> value);
>>> > >>>
>>> > >>>   // Remove all values for a key.
>>> > >>>   void remove (K key);
>>> > >>>
>>> > >>>   // Remove all values for a key with timestamps within the
>>> specified
>>> > >>> range.
>>> > >>>   void removeRange(K key, Instant startTs, Instant endTs);
>>> > >>>
>>> > >>>   // Get an Iterable of values for V. The Iterable will be returned
>>> > >>> sorted by timestamp.
>>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> get(K key);
>>> > >>>
>>> > >>>   // Get an Iterable of values for V in the specified range. The
>>> > >>> Iterable will be returned sorted by timestamp.
>>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> getRange(K key,
>>> Instant
>>> > >>> startTs, Instant endTs);
>>> > >>>
>>> > >>>   ReadableState<Iterable<K>> keys();
>>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> values();
>>> > >>>   ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries;
>>> > >>> }
>>> > >>>
>>> > >>> We can of course provide helper functions that allow using
>>> MultimapState
>>> > >>> without deailing with TimestampValue for users who only want a
>>> multimap and
>>> > >>> don't want sorting.
>>> > >>>
>>> > >>> I think many users will only need a single sorted list - not a full
>>> > >>> multimap. It's worth offering this as well, and we can simply
>>> build it on
>>> > >>> top of MultimapState. It will look like an extension of BagState
>>> > >>>
>>> > >>> public interface TimestampSortedListState<T> extends State {
>>> > >>>   void add(TimestampedValue<T> value);
>>> > >>>   Iterable<TimestampedValue<T>> read();
>>> > >>>   Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
>>> > >>> endTs);
>>> > >>>   void clearRange(Instant startTs, Instant endTs);
>>> > >>> }
>>> > >>>
>>> > >>>
>>> > >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com>
>>> wrote:
>>> > >>>
>>> > >>>> The portability layer is meant to live across multiple versions
>>> of Beam
>>> > >>>> and I don't think it should be treated by doing the simple and
>>> useful thing
>>> > >>>> now since I believe it will lead to a proliferation of the API.
>>> > >>>>
>>> > >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <ke...@apache.org>
>>> > >>>> wrote:
>>> > >>>>
>>> > >>>>> I have thoughts on the subject of whether to have APIs just for
>>> the
>>> > >>>>> lowest-level building blocks versus having APIs for higher-level
>>> > >>>>> constructs. Specifically this applies to providing only unsorted
>>> multimap
>>> > >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to
>>> focus on
>>> > >>>>> time-ordered buffer; if it turns out to be easy to go all the
>>> way to sorted
>>> > >>>>> multimap that's nice-to-have; if it turns out to be easy to
>>> implement on
>>> > >>>>> top of unsorted map state that should probably be under the hood
>>> > >>>>>
>>> > >>>>> Reasons to build low-level multimap in the runner & fn api and
>>> layer
>>> > >>>>> higher-level things in the SDK:
>>> > >>>>>
>>> > >>>>>  - It is less implementation for runners if they have to only
>>> provide
>>> > >>>>> fewer lower-level building blocks like multimap state.
>>> > >>>>>  - There are many more runners than SDKs (and will be even more
>>> and
>>> > >>>>> more) so this saves overall.
>>> > >>>>>
>>> > >>>>> Reasons to build higher-level constructs directly in the runner
>>> and fn
>>> > >>>>> api:
>>> > >>>>>
>>> > >>>>>  - Having multiple higher-level state types may actually be less
>>> > >>>>> implementation than one complex state type, especially if they
>>> map to
>>> > >>>>> runner primitives.
>>> > >>>>>  - The runner may have better specialized implementations,
>>> especially
>>> > >>>>> for something like a time-ordered buffer.
>>> > >>>>>  - The particular access patterns in an SDK-based implementation
>>> may
>>> > >>>>> not be ideal for each runner's underlying implementation of the
>>> low-level
>>> > >>>>> building block.
>>> > >>>>>  - There may be excessive gRPC overhead even for optimal access
>>> > >>>>> patterns.
>>> > >>>>>
>>> > >>>>> There are ways to have best of both worlds, like:
>>> > >>>>>
>>> > >>>>> 1. Define multiple state types according to fundamental access
>>> > >>>>> patterns, like we did this before portability.
>>> > >>>>> 2. If it is easy to layer one on top of the other, do that
>>> inside the
>>> > >>>>> runner. Provide shared code so for runners providing the
>>> lowest-level
>>> > >>>>> primitive they get all the types for free.
>>> > >>>>>
>>> > >>>>> I understand that this is an oversimplification. It still
>>> creates some
>>> > >>>>> more work. And APIs are a burden so it is good to introduce as
>>> few as
>>> > >>>>> possible for maintenance. But it has performance benefits and
>>> also unblocks
>>> > >>>>> "just doing the simple and useful thing now" which I always like
>>> to do as
>>> > >>>>> long as it is compatible with future changes. If the APIs are
>>> fundamental,
>>> > >>>>> like sets, maps, timestamp ordering, then it is safe to guess
>>> that they
>>> > >>>>> will change rarely and be useful forever.
>>> > >>>>>
>>> > >>>>> Kenn
>>> > >>>>>
>>> > >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com>
>>> wrote:
>>> > >>>>>
>>> > >>>>>> I would be glad to take a stab at how to provide sorting on top
>>> of
>>> > >>>>>> unsorted multimap state.
>>> > >>>>>> Based upon your description, you want integer keys representing
>>> > >>>>>> timestamps and arbitrary user value for the values, is that
>>> correct?
>>> > >>>>>> What kinds of operations do you need on the sorted map state in
>>> order
>>> > >>>>>> of efficiency requirements?
>>> > >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)),
>>> ClearAll(Range[x, y))
>>> > >>>>>> What kinds of operations do we expect the underlying unsorted
>>> map
>>> > >>>>>> state to be able to provide?
>>> > >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
>>> > >>>>>> enumerate(K)?)
>>> > >>>>>>
>>> > >>>>>> I went through a similar exercise of how to provide a list like
>>> side
>>> > >>>>>> input view over a multimap[1] side input which efficiently
>>> allowed
>>> > >>>>>> computation of size and provided random access while only
>>> having access to
>>> > >>>>>> get(K) and enumerate K's.
>>> > >>>>>>
>>> > >>>>>> 1:
>>> > >>>>>>
>>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
>>> > >>>>>>
>>> > >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com>
>>> wrote:
>>> > >>>>>>
>>> > >>>>>>> Bringing this subject up again,
>>> > >>>>>>>
>>> > >>>>>>> I've spent some time looking into implementing this for the
>>> Dataflow
>>> > >>>>>>> runner. I'm unable to find a way to implement the arbitrary
>>> sorted multimap
>>> > >>>>>>> efficiently for the case where there are large numbers of
>>> unique keys.
>>> > >>>>>>> Since the primary driving use case is timestamp ordering (i.e.
>>> key is event
>>> > >>>>>>> timestamp), you would expect to have nearly a new key per
>>> element. I
>>> > >>>>>>> considered Luke's suggestion above, but unfortunately it
>>> doesn't really
>>> > >>>>>>> solve this issue.
>>> > >>>>>>>
>>> > >>>>>>> The primary use case for sorting always seems to be sorting by
>>> > >>>>>>> timestamp. I want to propose that instead of building the
>>> fully-general
>>> > >>>>>>> sorted multimap, we instead focus on a state type where the
>>> sort key is an
>>> > >>>>>>> integral type (like a timestamp or an integer). There is still
>>> a valid use
>>> > >>>>>>> case for multimap, but we can provide that as an unordered
>>> state. At least
>>> > >>>>>>> for Dataflow, it will be much easier
>>> > >>>>>>>
>>> > >>>>>>> While my difficulties here may be specific to the Dataflow
>>> runner,
>>> > >>>>>>> any such support would have to be built into other runners as
>>> well, and
>>> > >>>>>>> limiting to integral sorting likely makes it easier for other
>>> runners to
>>> > >>>>>>> implement this. Also, if you look at this
>>> > >>>>>>> <
>>> https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
>>> Flink
>>> > >>>>>>> comment pointed out by Aljoscha, for Flink the main use case
>>> identified was
>>> > >>>>>>> also timestamp sorting. This will also simplify the API design
>>> for this
>>> > >>>>>>> feature: Sorted multimap with arbitrary keys would require us
>>> to introduce
>>> > >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a
>>> new
>>> > >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral
>>> types, the API
>>> > >>>>>>> design is simpler as integral types can be represented
>>> directly.
>>> > >>>>>>>
>>> > >>>>>>> Reuven
>>> > >>>>>>>
>>> > >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com>
>>> wrote:
>>> > >>>>>>>
>>> > >>>>>>>> This sounds to me like a potential runner strategy. However
>>> if a
>>> > >>>>>>>> runner can natively support sorted maps (e.g. we expect the
>>> Dataflow runner
>>> > >>>>>>>> to be able to do so, and I think it would be useful for other
>>> runners as
>>> > >>>>>>>> well), then it's probably preferable to allow the runner to
>>> use its native
>>> > >>>>>>>> capabilities.
>>> > >>>>>>>>
>>> > >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <
>>> lcwik@google.com>
>>> > >>>>>>>> wrote:
>>> > >>>>>>>>
>>> > >>>>>>>>> For the API that you proposed, the map key is always "void"
>>> and
>>> > >>>>>>>>> the sort key == user key. So in my example of
>>> > >>>>>>>>> key: dummy value
>>> > >>>>>>>>> key.000: token, (0001, value4)
>>> > >>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>>> > >>>>>>>>> key.01: token
>>> > >>>>>>>>> key.1: token, (1011, value3)
>>> > >>>>>>>>> you would have:
>>> > >>>>>>>>> "void": dummy value
>>> > >>>>>>>>> "void".000: token, (0001, value4)
>>> > >>>>>>>>> "void".001: token, (0010, value1), (0011, value2)
>>> > >>>>>>>>> "void".01: token
>>> > >>>>>>>>> "void".1: token, (1011, value3)
>>> > >>>>>>>>>
>>> > >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into
>>> walking
>>> > >>>>>>>>> the the prefixes until you find a common prefix for K and
>>> then filter for
>>> > >>>>>>>>> values where they have a sort key <= K. Using the example
>>> above, to find
>>> > >>>>>>>>> entriesUntil(0010) you would:
>>> > >>>>>>>>> look for key."", miss
>>> > >>>>>>>>> look for key.0, miss
>>> > >>>>>>>>> look for key.00, miss
>>> > >>>>>>>>> look for key.000, hit, sort all contained values using
>>> secondary
>>> > >>>>>>>>> key, provide value4 to user
>>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010
>>> so we
>>> > >>>>>>>>> sort all contained values using secondary key, filter out
>>> value2 and
>>> > >>>>>>>>> provide value1
>>> > >>>>>>>>>
>>> > >>>>>>>>> void removeUntil(K limit) also translates into walking the
>>> > >>>>>>>>> prefixes but instead we will clear them when we have a "hit"
>>> with some
>>> > >>>>>>>>> special logic for when the sort key is a prefix of the key.
>>> Used the
>>> > >>>>>>>>> example, to removeUntil(0010) you would:
>>> > >>>>>>>>> look for key."", miss
>>> > >>>>>>>>> look for key.0, miss
>>> > >>>>>>>>> look for key.00, miss
>>> > >>>>>>>>> look for key.000, hit, clear
>>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010
>>> so we
>>> > >>>>>>>>> sort all contained values using secondary key, store in
>>> memory all values
>>> > >>>>>>>>> that > 0010, clear and append values stored in memory.
>>> > >>>>>>>>>
>>> > >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <
>>> relax@google.com>
>>> > >>>>>>>>> wrote:
>>> > >>>>>>>>>
>>> > >>>>>>>>>> Can you explain how fetching and deleting ranges of keys
>>> would
>>> > >>>>>>>>>> work with this data structure?
>>> > >>>>>>>>>>
>>> > >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <
>>> lcwik@google.com>
>>> > >>>>>>>>>> wrote:
>>> > >>>>>>>>>>
>>> > >>>>>>>>>>> Reuven, for the example, I assume that we never want to
>>> store
>>> > >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we
>>> do then we will
>>> > >>>>>>>>>>> create a new longer prefix splitting up the values based
>>> upon the sort key.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> Tuple representation in examples below is (key, sort key,
>>> value)
>>> > >>>>>>>>>>> and . is a character outside of the alphabet which can be
>>> represented by
>>> > >>>>>>>>>>> using an escaping encoding that wraps the key + sort key
>>> encoding.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the
>>> > >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this
>>> case its 0, so we
>>> > >>>>>>>>>>> append value to the map at key.0 ending up with (we also
>>> set the key to any
>>> > >>>>>>>>>>> dummy value to know that it it contains values):
>>> > >>>>>>>>>>> key: dummy value
>>> > >>>>>>>>>>> key."": token, (0010, value1)
>>> > >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" +
>>> all
>>> > >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to
>>> key."" ending up
>>> > >>>>>>>>>>> with:
>>> > >>>>>>>>>>> key: dummy value
>>> > >>>>>>>>>>> key."": token, (0010, value1), (0011, value2)
>>> > >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" +
>>> all
>>> > >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is
>>> full, so we partition
>>> > >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear
>>> the "" prefix
>>> > >>>>>>>>>>> ending up with:
>>> > >>>>>>>>>>> key: dummy value
>>> > >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2)
>>> > >>>>>>>>>>> key.1: token, (1011, value3)
>>> > >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" +
>>> all
>>> > >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is
>>> full, so we
>>> > >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but
>>> notice this
>>> > >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00
>>> again to 000, 001.
>>> > >>>>>>>>>>> We also clear the 0 prefix ending up with:
>>> > >>>>>>>>>>> key: dummy value
>>> > >>>>>>>>>>> key.000: token, (0001, value4)
>>> > >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>>> > >>>>>>>>>>> key.01: token
>>> > >>>>>>>>>>> key.1: token, (1011, value3)
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> We are effectively building a trie[1] where we only have
>>> values
>>> > >>>>>>>>>>> at the leaves and control how full each leaf can be. There
>>> are other trie
>>> > >>>>>>>>>>> representations like a radix tree that may be better.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> Looking up the values in sorted order for "key" would go
>>> like
>>> > >>>>>>>>>>> this:
>>> > >>>>>>>>>>> Is key set, yes
>>> > >>>>>>>>>>> look for key."", miss
>>> > >>>>>>>>>>> look for key.0, miss
>>> > >>>>>>>>>>> look for key.00, miss
>>> > >>>>>>>>>>> look for key.000, hit, sort all contained values using
>>> secondary
>>> > >>>>>>>>>>> key, provide value4 to user
>>> > >>>>>>>>>>> look for key.001, hit, sort all contained values using
>>> secondary
>>> > >>>>>>>>>>> key, provide value1 followed by value2 to user
>>> > >>>>>>>>>>> look for key.01, hit, empty, return no values to user
>>> > >>>>>>>>>>> look for key.1, hit, sort all contained values using
>>> secondary
>>> > >>>>>>>>>>> key, provide value3 to user
>>> > >>>>>>>>>>> we have walked the entire prefix space, signal end of
>>> iterable
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> Some notes for the above:
>>> > >>>>>>>>>>> * The dummy value is used to know that the key contains
>>> values
>>> > >>>>>>>>>>> and the token is to know whether there are any values
>>> deeper in the trie so
>>> > >>>>>>>>>>> when we know when to stop searching.
>>> > >>>>>>>>>>> * If we can recalculate the sort key from the combination
>>> of the
>>> > >>>>>>>>>>> key and value, then we don't need to store it.
>>> > >>>>>>>>>>> * Keys with lots of values will perform worse then keys
>>> with
>>> > >>>>>>>>>>> less values since we have to look up more keys but they
>>> will be empty
>>> > >>>>>>>>>>> reads. The number of misses can be controlled by how many
>>> elements we are
>>> > >>>>>>>>>>> willing to store at a given node before we subdivide.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> In reality you could build a lot of structures (e.g. red
>>> black
>>> > >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the
>>> cost of
>>> > >>>>>>>>>>> rebalancing/re-organizing the structure in map form and
>>> whether it has a
>>> > >>>>>>>>>>> convenient pre-order traversal for lookups.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <
>>> relax@google.com>
>>> > >>>>>>>>>>> wrote:
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>>> Some great comments!
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented
>>> by
>>> > >>>>>>>>>>>> runners to be efficient. We can of course provide a
>>> default (inefficient)
>>> > >>>>>>>>>>>> implementation, but ideally runners would provide better
>>> ones.
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed
>>> by
>>> > >>>>>>>>>>>> this. E.g.
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying
>>> > >>>>>>>>>>>> this. That's why I suggested that we provide a way to tag
>>> the coders that
>>> > >>>>>>>>>>>> do preserve order, and only accept those as key coders
>>> Alternatively we
>>> > >>>>>>>>>>>> could present a more limited API - e.g. only allowing a
>>> hard-coded set of
>>> > >>>>>>>>>>>> types to be used as keys - but that seems counter to the
>>> direction Beam
>>> > >>>>>>>>>>>> usually goes. So users will have two ways .of creating
>>> multimap state specs:
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
>>> state =
>>> > >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(),
>>> StringUtf8Coder.of());
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> or
>>> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
>>> state =
>>> > >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(),
>>> StringUtf8Coder.of());
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> The second one will validate that the key coder preserves
>>> > >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism
>>> checking in
>>> > >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these
>>> functions that use
>>> > >>>>>>>>>>>> coder inference to "guess" the coder, but those will do
>>> the same checking)
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> Also the API I proposed did support random access! We
>>> could
>>> > >>>>>>>>>>>> separate out OrderedBagState again if we think the use
>>> cases are
>>> > >>>>>>>>>>>> fundamentally different. I merged the proposal into that
>>> of MultimapState
>>> > >>>>>>>>>>>> because there seemed be 99% overlap.
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> Reuven
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <
>>> > >>>>>>>>>>>> robertwb@google.com> wrote:
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <
>>> relax@google.com>
>>> > >>>>>>>>>>>>> wrote:
>>> > >>>>>>>>>>>>> >
>>> > >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <
>>> > >>>>>>>>>>>>> altay@google.com> wrote:
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <
>>> > >>>>>>>>>>>>> lcwik@google.com> wrote:
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <
>>> > >>>>>>>>>>>>> ruwang@google.com> wrote:
>>> > >>>>>>>>>>>>> >>>>>
>>> > >>>>>>>>>>>>> >>>>> A few obvious problems with this code:
>>> > >>>>>>>>>>>>> >>>>>   1. Removing the elements already processed from
>>> the
>>> > >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This
>>> is O(n^2) in the
>>> > >>>>>>>>>>>>> number of input trades.
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting
>>> trade
>>> > >>>>>>>>>>>>> state?
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>>>
>>> > >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends
>>> State
>>> > >>>>>>>>>>>>> {
>>> > >>>>>>>>>>>>> >>>>>   // Add a value to the map.
>>> > >>>>>>>>>>>>> >>>>>   void put(K key, V value);
>>> > >>>>>>>>>>>>> >>>>>   // Get all values for a given key.
>>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
>>> > >>>>>>>>>>>>> >>>>>  // Return all entries in the map.
>>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
>>> > >>>>>>>>>>>>> >>>>>   // Return all entries in the map with keys <=
>>> limit.
>>> > >>>>>>>>>>>>> returned elements are sorted by the key.
>>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K
>>> > >>>>>>>>>>>>> limit);
>>> > >>>>>>>>>>>>> >>>>>
>>> > >>>>>>>>>>>>> >>>>>  // Remove all values with the given key;
>>> > >>>>>>>>>>>>> >>>>>   void remove(K key);
>>> > >>>>>>>>>>>>> >>>>>  // Remove all entries in the map with keys <=
>>> limit.
>>> > >>>>>>>>>>>>> >>>>>   void removeUntil(K limit);
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will
>>> remove
>>> > >>>>>>>>>>>>> all entries in the map with keys < limit.
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>>>
>>> > >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of
>>> the key.
>>> > >>>>>>>>>>>>> In order to make this easier for users, I propose that
>>> we introduce a new
>>> > >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this
>>> tag guarantees
>>> > >>>>>>>>>>>>> that the encoded value preserves the same ordering as
>>> the base Java type.
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>> Could you clarify what is  "encoded value preserves
>>> the
>>> > >>>>>>>>>>>>> same ordering as the base Java type"?
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>> Lets say A and B represent two different instances
>>> of the
>>> > >>>>>>>>>>>>> same Java type like a double, then A < B (using the
>>> languages comparison
>>> > >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded
>>> versions are compared
>>> > >>>>>>>>>>>>> lexicographically)
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A <
>>> B iff
>>> > >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we
>>> support? What happens A,
>>> > >>>>>>>>>>>>> B sort differently in different languages?
>>> > >>>>>>>>>>>>> >
>>> > >>>>>>>>>>>>> >
>>> > >>>>>>>>>>>>> > That would have to be the property of the coder (which
>>> means
>>> > >>>>>>>>>>>>> that this property probably needs to be represented in
>>> the portability
>>> > >>>>>>>>>>>>> representation of the coder). I imagine the common use
>>> cases will be for
>>> > >>>>>>>>>>>>> simple coders like int, long, string, etc., which are
>>> likely to sort the
>>> > >>>>>>>>>>>>> same in most languages.
>>> > >>>>>>>>>>>>>
>>> > >>>>>>>>>>>>> The standard coders for both double and integral types
>>> do not
>>> > >>>>>>>>>>>>> respect
>>> > >>>>>>>>>>>>> the natural ordering (consider negative values). KV
>>> coders
>>> > >>>>>>>>>>>>> violate the
>>> > >>>>>>>>>>>>> "natural" lexicographic ordering on components as well.
>>> I think
>>> > >>>>>>>>>>>>> implicitly sorting on encoded value would yield many
>>> > >>>>>>>>>>>>> surprises. (The
>>> > >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes
>>> > >>>>>>>>>>>>> (string?)-producing callable as a parameter of course).
>>> (As for
>>> > >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or
>>> something
>>> > >>>>>>>>>>>>> like
>>> > >>>>>>>>>>>>> that...rather than Map which tends to imply random
>>> access.)
>>> > >>>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> >
>>>
>>

Re: [External] Re: DISCUSS: Sorted MapState API

Posted by Tyson Hamilton <ty...@google.com>.
The use case is to support an unbounded stream-stream join, where the
elements are arriving in roughly time sorted order. Removing a specific
element from the timestamp indexed collection is necessary when a match is
found. Having clearRange is helpful to expire elements that are no longer
relevant according to a user-provided time based join predicate (e.g. WHEN
ABS(leftElement.timestamp - rightElement.timestamp) < 5 minutes).

I'll think a bit more on how to use MapState instead if having a remove()
like method for a single element isn't an option.

On Tue, Sep 15, 2020 at 8:52 PM Reuven Lax <re...@google.com> wrote:

> Hi,
>
> Currently we only support removing a timestamp range. You can remove a
> single timestamp of course by removing [ts, ts+1), however if there are
> multiple elements with the same timestamp this will remove all of those
> elements.
>
> Does this fit your use case? If not, I wonder if MapState is closer to
> what you are looking for?
>
> Reuven
>
> On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton <ty...@google.com> wrote:
>
>> Hi Reuven,
>>
>> I noticed that there was an implementation of the in-memory
>> OrderedListState introduced [1]. Where can I find out more regarding the
>> plan and design? Is there a design doc? I'd like to know more details about
>> the implementation to see if it fits my use case. I was hoping it would
>> have a remove(TimestampedValue<T> e) method.
>>
>> Thanks,
>> -Tyson
>>
>>
>> [1]:
>> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41
>>
>>
>> On 2020/08/03 21:41:46, Catlyn Kong <ca...@yelp.com> wrote:
>> > Hey folks,
>> >
>> > Sry I'm late to this thread but this might be very helpful for the
>> problem
>> > we're dealing with. Do we have a design doc or a jira ticket I can
>> follow?
>> >
>> > Cheers,
>> > Catlyn
>> >
>> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <je...@seznam.cz> wrote:
>> >
>> > > My questions were just an example. I fully agree there is a
>> fundamental
>> > > need for a sorted state (of some form, and I also think this links to
>> > > efficient implementation of retrations) - I was reacting to Kenn's
>> question
>> > > about BIP. This one would be pretty nice example why it would be good
>> to
>> > > have such a "process" - not everything can be solved on ML and there
>> are
>> > > fundamental decisions that might need a closer attention.
>> > > On 6/18/20 5:28 PM, Reuven Lax wrote:
>> > >
>> > > Jan - my proposal is exactly TimeSortedBagState (more accurately -
>> > > TimeSortedListState), though I went a bit further and also proposed a
>> way
>> > > to have a dynamic number of tagged TimeSortedBagStates.
>> > >
>> > > You are correct that the runner doesn't really have to store the data
>> time
>> > > sorted - what's actually needed is the ability to fetch and remove
>> > > timestamp ranges of data (though that does include fetching the entire
>> > > list); TimeOrderedState is probably a more accurate name then
>> > > TimeSortedState. I don't think we could get away with operations that
>> only
>> > > act on the smallest timestamp, however we could limit the API to only
>> being
>> > > able to fetch and remove prefixes of data (ordered by timestamp).
>> However
>> > > if we support prefixes, we might as well support arbitrary subranges.
>> > >
>> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <je...@seznam.cz> wrote:
>> > >
>> > >> Big +1 for a BIP, as this might really help clarify all the pros and
>> cons
>> > >> of all possibilities. There seem to be questions that need answering
>> and
>> > >> motivating use cases - do we need sorted map state or can we solve
>> our use
>> > >> cases by something simpler - e.g. the mentioned TimeSortedBagState?
>> Does
>> > >> that really have to be time-sorted structure, or does it "only" have
>> to
>> > >> have operations that can efficiently find and remove element with
>> smallest
>> > >> timestamp (like a PriorityQueue)?
>> > >>
>> > >> Jan
>> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
>> > >>
>> > >> Zooming in from generic philosophy to be clear: adding time ordered
>> > >> buffer to the Fn state API is *not* a shortcut.It has benefits that
>> will
>> > >> not be achieved by SDK-side implementation on top of either ordered
>> or
>> > >> unordered multimap. Are those benefits worth expanding the API? I
>> don't
>> > >> know.
>> > >>
>> > >> A change to allow a runner to have a specialized implementation for
>> > >> time-buffered state would be one or more StateKey types, right?
>> Reuven,
>> > >> maybe put this and your Java API in a doc? A BIP? Seems like there's
>> at
>> > >> least the following to explore:
>> > >>
>> > >>  - how that Java API would map to an SDK-side implementation on top
>> of
>> > >> multimap state key
>> > >>  - how that Java API would map to a new StateKey
>> > >>  - whether there's actually more than one relevant implementation of
>> that
>> > >> StateKey
>> > >>  - whether SDK-side implementation on some other state key would be
>> > >> performant enough in all SDK languages (present and future)
>> > >>
>> > >> Zooming back out to generic philosophy: Proliferation of StateKey
>> > >> types tuned by runners (which can very easily still share
>> implementation)
>> > >> is probably better than proliferation of complex SDK-side
>> implementations
>> > >> with varying completeness and performance.
>> > >>
>> > >> Kenn
>> > >>
>> > >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com> wrote:
>> > >>
>> > >>> It might help for me to describe what I have in mind. I'm still
>> > >>> proposing that we build multimap, just not a globally-sorted
>> multimap.
>> > >>>
>> > >>> My previous proposal was that we provide a Multimap<Key, Value>
>> state
>> > >>> type, sorted by key. this would have two additional operations -
>> > >>> multimap.getRange(startKey, endKey) and
>> multimap.deleteRange(startKey,
>> > >>> endKey). The primary use case was timestamp sorting, but I felt
>> that a
>> > >>> sorted multimap provided a nice generalization - after all, you can
>> simply
>> > >>> key the multimap by timestamp to get timestamp sorting.
>> > >>>
>> > >>> This approach had some issues immediately that would take some work
>> to
>> > >>> solve. Since a multimap key can have any type and a runner will
>> only be
>> > >>> able to sort by encoded type, we would need to introduce a concept
>> of
>> > >>> order-preserving coders into Beam and plumb that through. Robert
>> pointed
>> > >>> out that even our existing standard coders for simple integral
>> types don't
>> > >>> preserve order, so there will likely be surprises here.
>> > >>>
>> > >>> My current proposal is for a multimap that is not sorted by key, but
>> > >>> that can support.ordered values for a single key. Remember that a
>> multimap
>> > >>> maps K -> Iterable<V>, so this means that each individual
>> Iterable<V> is
>> > >>> ordered, but the keys have no specific order relative to each
>> other. This
>> > >>> is not too different from many multimap implementations where the
>> keys are
>> > >>> unordered, but the list of values for a single key at least has a
>> stable
>> > >>> order.
>> > >>>
>> > >>> The interface would look like this:
>> > >>>
>> > >>> public interface MultimapState<K, V> extends State {
>> > >>>   // Add a value with a default timestamp.
>> > >>>   void put(K key, V value);
>> > >>>
>> > >>>   // Add a timestamped value.
>> > >>>   void put(K, key, TimestampedValue<V> value);
>> > >>>
>> > >>>   // Remove all values for a key.
>> > >>>   void remove (K key);
>> > >>>
>> > >>>   // Remove all values for a key with timestamps within the
>> specified
>> > >>> range.
>> > >>>   void removeRange(K key, Instant startTs, Instant endTs);
>> > >>>
>> > >>>   // Get an Iterable of values for V. The Iterable will be returned
>> > >>> sorted by timestamp.
>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> get(K key);
>> > >>>
>> > >>>   // Get an Iterable of values for V in the specified range. The
>> > >>> Iterable will be returned sorted by timestamp.
>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> getRange(K key,
>> Instant
>> > >>> startTs, Instant endTs);
>> > >>>
>> > >>>   ReadableState<Iterable<K>> keys();
>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> values();
>> > >>>   ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries;
>> > >>> }
>> > >>>
>> > >>> We can of course provide helper functions that allow using
>> MultimapState
>> > >>> without deailing with TimestampValue for users who only want a
>> multimap and
>> > >>> don't want sorting.
>> > >>>
>> > >>> I think many users will only need a single sorted list - not a full
>> > >>> multimap. It's worth offering this as well, and we can simply build
>> it on
>> > >>> top of MultimapState. It will look like an extension of BagState
>> > >>>
>> > >>> public interface TimestampSortedListState<T> extends State {
>> > >>>   void add(TimestampedValue<T> value);
>> > >>>   Iterable<TimestampedValue<T>> read();
>> > >>>   Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
>> > >>> endTs);
>> > >>>   void clearRange(Instant startTs, Instant endTs);
>> > >>> }
>> > >>>
>> > >>>
>> > >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com> wrote:
>> > >>>
>> > >>>> The portability layer is meant to live across multiple versions of
>> Beam
>> > >>>> and I don't think it should be treated by doing the simple and
>> useful thing
>> > >>>> now since I believe it will lead to a proliferation of the API.
>> > >>>>
>> > >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <ke...@apache.org>
>> > >>>> wrote:
>> > >>>>
>> > >>>>> I have thoughts on the subject of whether to have APIs just for
>> the
>> > >>>>> lowest-level building blocks versus having APIs for higher-level
>> > >>>>> constructs. Specifically this applies to providing only unsorted
>> multimap
>> > >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to
>> focus on
>> > >>>>> time-ordered buffer; if it turns out to be easy to go all the way
>> to sorted
>> > >>>>> multimap that's nice-to-have; if it turns out to be easy to
>> implement on
>> > >>>>> top of unsorted map state that should probably be under the hood
>> > >>>>>
>> > >>>>> Reasons to build low-level multimap in the runner & fn api and
>> layer
>> > >>>>> higher-level things in the SDK:
>> > >>>>>
>> > >>>>>  - It is less implementation for runners if they have to only
>> provide
>> > >>>>> fewer lower-level building blocks like multimap state.
>> > >>>>>  - There are many more runners than SDKs (and will be even more
>> and
>> > >>>>> more) so this saves overall.
>> > >>>>>
>> > >>>>> Reasons to build higher-level constructs directly in the runner
>> and fn
>> > >>>>> api:
>> > >>>>>
>> > >>>>>  - Having multiple higher-level state types may actually be less
>> > >>>>> implementation than one complex state type, especially if they
>> map to
>> > >>>>> runner primitives.
>> > >>>>>  - The runner may have better specialized implementations,
>> especially
>> > >>>>> for something like a time-ordered buffer.
>> > >>>>>  - The particular access patterns in an SDK-based implementation
>> may
>> > >>>>> not be ideal for each runner's underlying implementation of the
>> low-level
>> > >>>>> building block.
>> > >>>>>  - There may be excessive gRPC overhead even for optimal access
>> > >>>>> patterns.
>> > >>>>>
>> > >>>>> There are ways to have best of both worlds, like:
>> > >>>>>
>> > >>>>> 1. Define multiple state types according to fundamental access
>> > >>>>> patterns, like we did this before portability.
>> > >>>>> 2. If it is easy to layer one on top of the other, do that inside
>> the
>> > >>>>> runner. Provide shared code so for runners providing the
>> lowest-level
>> > >>>>> primitive they get all the types for free.
>> > >>>>>
>> > >>>>> I understand that this is an oversimplification. It still creates
>> some
>> > >>>>> more work. And APIs are a burden so it is good to introduce as
>> few as
>> > >>>>> possible for maintenance. But it has performance benefits and
>> also unblocks
>> > >>>>> "just doing the simple and useful thing now" which I always like
>> to do as
>> > >>>>> long as it is compatible with future changes. If the APIs are
>> fundamental,
>> > >>>>> like sets, maps, timestamp ordering, then it is safe to guess
>> that they
>> > >>>>> will change rarely and be useful forever.
>> > >>>>>
>> > >>>>> Kenn
>> > >>>>>
>> > >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com>
>> wrote:
>> > >>>>>
>> > >>>>>> I would be glad to take a stab at how to provide sorting on top
>> of
>> > >>>>>> unsorted multimap state.
>> > >>>>>> Based upon your description, you want integer keys representing
>> > >>>>>> timestamps and arbitrary user value for the values, is that
>> correct?
>> > >>>>>> What kinds of operations do you need on the sorted map state in
>> order
>> > >>>>>> of efficiency requirements?
>> > >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)),
>> ClearAll(Range[x, y))
>> > >>>>>> What kinds of operations do we expect the underlying unsorted map
>> > >>>>>> state to be able to provide?
>> > >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
>> > >>>>>> enumerate(K)?)
>> > >>>>>>
>> > >>>>>> I went through a similar exercise of how to provide a list like
>> side
>> > >>>>>> input view over a multimap[1] side input which efficiently
>> allowed
>> > >>>>>> computation of size and provided random access while only having
>> access to
>> > >>>>>> get(K) and enumerate K's.
>> > >>>>>>
>> > >>>>>> 1:
>> > >>>>>>
>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
>> > >>>>>>
>> > >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com>
>> wrote:
>> > >>>>>>
>> > >>>>>>> Bringing this subject up again,
>> > >>>>>>>
>> > >>>>>>> I've spent some time looking into implementing this for the
>> Dataflow
>> > >>>>>>> runner. I'm unable to find a way to implement the arbitrary
>> sorted multimap
>> > >>>>>>> efficiently for the case where there are large numbers of
>> unique keys.
>> > >>>>>>> Since the primary driving use case is timestamp ordering (i.e.
>> key is event
>> > >>>>>>> timestamp), you would expect to have nearly a new key per
>> element. I
>> > >>>>>>> considered Luke's suggestion above, but unfortunately it
>> doesn't really
>> > >>>>>>> solve this issue.
>> > >>>>>>>
>> > >>>>>>> The primary use case for sorting always seems to be sorting by
>> > >>>>>>> timestamp. I want to propose that instead of building the
>> fully-general
>> > >>>>>>> sorted multimap, we instead focus on a state type where the
>> sort key is an
>> > >>>>>>> integral type (like a timestamp or an integer). There is still
>> a valid use
>> > >>>>>>> case for multimap, but we can provide that as an unordered
>> state. At least
>> > >>>>>>> for Dataflow, it will be much easier
>> > >>>>>>>
>> > >>>>>>> While my difficulties here may be specific to the Dataflow
>> runner,
>> > >>>>>>> any such support would have to be built into other runners as
>> well, and
>> > >>>>>>> limiting to integral sorting likely makes it easier for other
>> runners to
>> > >>>>>>> implement this. Also, if you look at this
>> > >>>>>>> <
>> https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
>> Flink
>> > >>>>>>> comment pointed out by Aljoscha, for Flink the main use case
>> identified was
>> > >>>>>>> also timestamp sorting. This will also simplify the API design
>> for this
>> > >>>>>>> feature: Sorted multimap with arbitrary keys would require us
>> to introduce
>> > >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a
>> new
>> > >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral
>> types, the API
>> > >>>>>>> design is simpler as integral types can be represented directly.
>> > >>>>>>>
>> > >>>>>>> Reuven
>> > >>>>>>>
>> > >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com>
>> wrote:
>> > >>>>>>>
>> > >>>>>>>> This sounds to me like a potential runner strategy. However if
>> a
>> > >>>>>>>> runner can natively support sorted maps (e.g. we expect the
>> Dataflow runner
>> > >>>>>>>> to be able to do so, and I think it would be useful for other
>> runners as
>> > >>>>>>>> well), then it's probably preferable to allow the runner to
>> use its native
>> > >>>>>>>> capabilities.
>> > >>>>>>>>
>> > >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <lcwik@google.com
>> >
>> > >>>>>>>> wrote:
>> > >>>>>>>>
>> > >>>>>>>>> For the API that you proposed, the map key is always "void"
>> and
>> > >>>>>>>>> the sort key == user key. So in my example of
>> > >>>>>>>>> key: dummy value
>> > >>>>>>>>> key.000: token, (0001, value4)
>> > >>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>> > >>>>>>>>> key.01: token
>> > >>>>>>>>> key.1: token, (1011, value3)
>> > >>>>>>>>> you would have:
>> > >>>>>>>>> "void": dummy value
>> > >>>>>>>>> "void".000: token, (0001, value4)
>> > >>>>>>>>> "void".001: token, (0010, value1), (0011, value2)
>> > >>>>>>>>> "void".01: token
>> > >>>>>>>>> "void".1: token, (1011, value3)
>> > >>>>>>>>>
>> > >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into
>> walking
>> > >>>>>>>>> the the prefixes until you find a common prefix for K and
>> then filter for
>> > >>>>>>>>> values where they have a sort key <= K. Using the example
>> above, to find
>> > >>>>>>>>> entriesUntil(0010) you would:
>> > >>>>>>>>> look for key."", miss
>> > >>>>>>>>> look for key.0, miss
>> > >>>>>>>>> look for key.00, miss
>> > >>>>>>>>> look for key.000, hit, sort all contained values using
>> secondary
>> > >>>>>>>>> key, provide value4 to user
>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so
>> we
>> > >>>>>>>>> sort all contained values using secondary key, filter out
>> value2 and
>> > >>>>>>>>> provide value1
>> > >>>>>>>>>
>> > >>>>>>>>> void removeUntil(K limit) also translates into walking the
>> > >>>>>>>>> prefixes but instead we will clear them when we have a "hit"
>> with some
>> > >>>>>>>>> special logic for when the sort key is a prefix of the key.
>> Used the
>> > >>>>>>>>> example, to removeUntil(0010) you would:
>> > >>>>>>>>> look for key."", miss
>> > >>>>>>>>> look for key.0, miss
>> > >>>>>>>>> look for key.00, miss
>> > >>>>>>>>> look for key.000, hit, clear
>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so
>> we
>> > >>>>>>>>> sort all contained values using secondary key, store in
>> memory all values
>> > >>>>>>>>> that > 0010, clear and append values stored in memory.
>> > >>>>>>>>>
>> > >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <relax@google.com
>> >
>> > >>>>>>>>> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>>> Can you explain how fetching and deleting ranges of keys
>> would
>> > >>>>>>>>>> work with this data structure?
>> > >>>>>>>>>>
>> > >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <
>> lcwik@google.com>
>> > >>>>>>>>>> wrote:
>> > >>>>>>>>>>
>> > >>>>>>>>>>> Reuven, for the example, I assume that we never want to
>> store
>> > >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we do
>> then we will
>> > >>>>>>>>>>> create a new longer prefix splitting up the values based
>> upon the sort key.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Tuple representation in examples below is (key, sort key,
>> value)
>> > >>>>>>>>>>> and . is a character outside of the alphabet which can be
>> represented by
>> > >>>>>>>>>>> using an escaping encoding that wraps the key + sort key
>> encoding.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the
>> > >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this
>> case its 0, so we
>> > >>>>>>>>>>> append value to the map at key.0 ending up with (we also
>> set the key to any
>> > >>>>>>>>>>> dummy value to know that it it contains values):
>> > >>>>>>>>>>> key: dummy value
>> > >>>>>>>>>>> key."": token, (0010, value1)
>> > >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" +
>> all
>> > >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to
>> key."" ending up
>> > >>>>>>>>>>> with:
>> > >>>>>>>>>>> key: dummy value
>> > >>>>>>>>>>> key."": token, (0010, value1), (0011, value2)
>> > >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" +
>> all
>> > >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is full,
>> so we partition
>> > >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear the
>> "" prefix
>> > >>>>>>>>>>> ending up with:
>> > >>>>>>>>>>> key: dummy value
>> > >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2)
>> > >>>>>>>>>>> key.1: token, (1011, value3)
>> > >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" +
>> all
>> > >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is
>> full, so we
>> > >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but
>> notice this
>> > >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00
>> again to 000, 001.
>> > >>>>>>>>>>> We also clear the 0 prefix ending up with:
>> > >>>>>>>>>>> key: dummy value
>> > >>>>>>>>>>> key.000: token, (0001, value4)
>> > >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>> > >>>>>>>>>>> key.01: token
>> > >>>>>>>>>>> key.1: token, (1011, value3)
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> We are effectively building a trie[1] where we only have
>> values
>> > >>>>>>>>>>> at the leaves and control how full each leaf can be. There
>> are other trie
>> > >>>>>>>>>>> representations like a radix tree that may be better.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Looking up the values in sorted order for "key" would go
>> like
>> > >>>>>>>>>>> this:
>> > >>>>>>>>>>> Is key set, yes
>> > >>>>>>>>>>> look for key."", miss
>> > >>>>>>>>>>> look for key.0, miss
>> > >>>>>>>>>>> look for key.00, miss
>> > >>>>>>>>>>> look for key.000, hit, sort all contained values using
>> secondary
>> > >>>>>>>>>>> key, provide value4 to user
>> > >>>>>>>>>>> look for key.001, hit, sort all contained values using
>> secondary
>> > >>>>>>>>>>> key, provide value1 followed by value2 to user
>> > >>>>>>>>>>> look for key.01, hit, empty, return no values to user
>> > >>>>>>>>>>> look for key.1, hit, sort all contained values using
>> secondary
>> > >>>>>>>>>>> key, provide value3 to user
>> > >>>>>>>>>>> we have walked the entire prefix space, signal end of
>> iterable
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Some notes for the above:
>> > >>>>>>>>>>> * The dummy value is used to know that the key contains
>> values
>> > >>>>>>>>>>> and the token is to know whether there are any values
>> deeper in the trie so
>> > >>>>>>>>>>> when we know when to stop searching.
>> > >>>>>>>>>>> * If we can recalculate the sort key from the combination
>> of the
>> > >>>>>>>>>>> key and value, then we don't need to store it.
>> > >>>>>>>>>>> * Keys with lots of values will perform worse then keys with
>> > >>>>>>>>>>> less values since we have to look up more keys but they
>> will be empty
>> > >>>>>>>>>>> reads. The number of misses can be controlled by how many
>> elements we are
>> > >>>>>>>>>>> willing to store at a given node before we subdivide.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> In reality you could build a lot of structures (e.g. red
>> black
>> > >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the
>> cost of
>> > >>>>>>>>>>> rebalancing/re-organizing the structure in map form and
>> whether it has a
>> > >>>>>>>>>>> convenient pre-order traversal for lookups.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <
>> relax@google.com>
>> > >>>>>>>>>>> wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> Some great comments!
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by
>> > >>>>>>>>>>>> runners to be efficient. We can of course provide a
>> default (inefficient)
>> > >>>>>>>>>>>> implementation, but ideally runners would provide better
>> ones.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by
>> > >>>>>>>>>>>> this. E.g.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying
>> > >>>>>>>>>>>> this. That's why I suggested that we provide a way to tag
>> the coders that
>> > >>>>>>>>>>>> do preserve order, and only accept those as key coders
>> Alternatively we
>> > >>>>>>>>>>>> could present a more limited API - e.g. only allowing a
>> hard-coded set of
>> > >>>>>>>>>>>> types to be used as keys - but that seems counter to the
>> direction Beam
>> > >>>>>>>>>>>> usually goes. So users will have two ways .of creating
>> multimap state specs:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
>> state =
>> > >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(),
>> StringUtf8Coder.of());
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> or
>> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
>> state =
>> > >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(),
>> StringUtf8Coder.of());
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> The second one will validate that the key coder preserves
>> > >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism
>> checking in
>> > >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these
>> functions that use
>> > >>>>>>>>>>>> coder inference to "guess" the coder, but those will do
>> the same checking)
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Also the API I proposed did support random access! We could
>> > >>>>>>>>>>>> separate out OrderedBagState again if we think the use
>> cases are
>> > >>>>>>>>>>>> fundamentally different. I merged the proposal into that
>> of MultimapState
>> > >>>>>>>>>>>> because there seemed be 99% overlap.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Reuven
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <
>> > >>>>>>>>>>>> robertwb@google.com> wrote:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <
>> relax@google.com>
>> > >>>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>> >
>> > >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <
>> > >>>>>>>>>>>>> altay@google.com> wrote:
>> > >>>>>>>>>>>>> >>
>> > >>>>>>>>>>>>> >>
>> > >>>>>>>>>>>>> >>
>> > >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <
>> > >>>>>>>>>>>>> lcwik@google.com> wrote:
>> > >>>>>>>>>>>>> >>>
>> > >>>>>>>>>>>>> >>>
>> > >>>>>>>>>>>>> >>>
>> > >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <
>> > >>>>>>>>>>>>> ruwang@google.com> wrote:
>> > >>>>>>>>>>>>> >>>>>
>> > >>>>>>>>>>>>> >>>>> A few obvious problems with this code:
>> > >>>>>>>>>>>>> >>>>>   1. Removing the elements already processed from
>> the
>> > >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This
>> is O(n^2) in the
>> > >>>>>>>>>>>>> number of input trades.
>> > >>>>>>>>>>>>> >>>>
>> > >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade
>> > >>>>>>>>>>>>> state?
>> > >>>>>>>>>>>>> >>>>
>> > >>>>>>>>>>>>> >>>>>
>> > >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends
>> State
>> > >>>>>>>>>>>>> {
>> > >>>>>>>>>>>>> >>>>>   // Add a value to the map.
>> > >>>>>>>>>>>>> >>>>>   void put(K key, V value);
>> > >>>>>>>>>>>>> >>>>>   // Get all values for a given key.
>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
>> > >>>>>>>>>>>>> >>>>>  // Return all entries in the map.
>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
>> > >>>>>>>>>>>>> >>>>>   // Return all entries in the map with keys <=
>> limit.
>> > >>>>>>>>>>>>> returned elements are sorted by the key.
>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K
>> > >>>>>>>>>>>>> limit);
>> > >>>>>>>>>>>>> >>>>>
>> > >>>>>>>>>>>>> >>>>>  // Remove all values with the given key;
>> > >>>>>>>>>>>>> >>>>>   void remove(K key);
>> > >>>>>>>>>>>>> >>>>>  // Remove all entries in the map with keys <=
>> limit.
>> > >>>>>>>>>>>>> >>>>>   void removeUntil(K limit);
>> > >>>>>>>>>>>>> >>>>
>> > >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will
>> remove
>> > >>>>>>>>>>>>> all entries in the map with keys < limit.
>> > >>>>>>>>>>>>> >>>>
>> > >>>>>>>>>>>>> >>>>>
>> > >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of the
>> key.
>> > >>>>>>>>>>>>> In order to make this easier for users, I propose that we
>> introduce a new
>> > >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this
>> tag guarantees
>> > >>>>>>>>>>>>> that the encoded value preserves the same ordering as the
>> base Java type.
>> > >>>>>>>>>>>>> >>>>
>> > >>>>>>>>>>>>> >>>>
>> > >>>>>>>>>>>>> >>>> Could you clarify what is  "encoded value preserves
>> the
>> > >>>>>>>>>>>>> same ordering as the base Java type"?
>> > >>>>>>>>>>>>> >>>
>> > >>>>>>>>>>>>> >>>
>> > >>>>>>>>>>>>> >>> Lets say A and B represent two different instances of
>> the
>> > >>>>>>>>>>>>> same Java type like a double, then A < B (using the
>> languages comparison
>> > >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded
>> versions are compared
>> > >>>>>>>>>>>>> lexicographically)
>> > >>>>>>>>>>>>> >>
>> > >>>>>>>>>>>>> >>
>> > >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A <
>> B iff
>> > >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we
>> support? What happens A,
>> > >>>>>>>>>>>>> B sort differently in different languages?
>> > >>>>>>>>>>>>> >
>> > >>>>>>>>>>>>> >
>> > >>>>>>>>>>>>> > That would have to be the property of the coder (which
>> means
>> > >>>>>>>>>>>>> that this property probably needs to be represented in
>> the portability
>> > >>>>>>>>>>>>> representation of the coder). I imagine the common use
>> cases will be for
>> > >>>>>>>>>>>>> simple coders like int, long, string, etc., which are
>> likely to sort the
>> > >>>>>>>>>>>>> same in most languages.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> The standard coders for both double and integral types do
>> not
>> > >>>>>>>>>>>>> respect
>> > >>>>>>>>>>>>> the natural ordering (consider negative values). KV coders
>> > >>>>>>>>>>>>> violate the
>> > >>>>>>>>>>>>> "natural" lexicographic ordering on components as well. I
>> think
>> > >>>>>>>>>>>>> implicitly sorting on encoded value would yield many
>> > >>>>>>>>>>>>> surprises. (The
>> > >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes
>> > >>>>>>>>>>>>> (string?)-producing callable as a parameter of course).
>> (As for
>> > >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or
>> something
>> > >>>>>>>>>>>>> like
>> > >>>>>>>>>>>>> that...rather than Map which tends to imply random
>> access.)
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> >
>>
>

Re: [External] Re: DISCUSS: Sorted MapState API

Posted by Reuven Lax <re...@google.com>.
Hi,

Currently we only support removing a timestamp range. You can remove a
single timestamp of course by removing [ts, ts+1), however if there are
multiple elements with the same timestamp this will remove all of those
elements.

Does this fit your use case? If not, I wonder if MapState is closer to what
you are looking for?

Reuven

On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton <ty...@google.com> wrote:

> Hi Reuven,
>
> I noticed that there was an implementation of the in-memory
> OrderedListState introduced [1]. Where can I find out more regarding the
> plan and design? Is there a design doc? I'd like to know more details about
> the implementation to see if it fits my use case. I was hoping it would
> have a remove(TimestampedValue<T> e) method.
>
> Thanks,
> -Tyson
>
>
> [1]:
> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41
>
>
> On 2020/08/03 21:41:46, Catlyn Kong <ca...@yelp.com> wrote:
> > Hey folks,
> >
> > Sry I'm late to this thread but this might be very helpful for the
> problem
> > we're dealing with. Do we have a design doc or a jira ticket I can
> follow?
> >
> > Cheers,
> > Catlyn
> >
> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > > My questions were just an example. I fully agree there is a fundamental
> > > need for a sorted state (of some form, and I also think this links to
> > > efficient implementation of retrations) - I was reacting to Kenn's
> question
> > > about BIP. This one would be pretty nice example why it would be good
> to
> > > have such a "process" - not everything can be solved on ML and there
> are
> > > fundamental decisions that might need a closer attention.
> > > On 6/18/20 5:28 PM, Reuven Lax wrote:
> > >
> > > Jan - my proposal is exactly TimeSortedBagState (more accurately -
> > > TimeSortedListState), though I went a bit further and also proposed a
> way
> > > to have a dynamic number of tagged TimeSortedBagStates.
> > >
> > > You are correct that the runner doesn't really have to store the data
> time
> > > sorted - what's actually needed is the ability to fetch and remove
> > > timestamp ranges of data (though that does include fetching the entire
> > > list); TimeOrderedState is probably a more accurate name then
> > > TimeSortedState. I don't think we could get away with operations that
> only
> > > act on the smallest timestamp, however we could limit the API to only
> being
> > > able to fetch and remove prefixes of data (ordered by timestamp).
> However
> > > if we support prefixes, we might as well support arbitrary subranges.
> > >
> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <je...@seznam.cz> wrote:
> > >
> > >> Big +1 for a BIP, as this might really help clarify all the pros and
> cons
> > >> of all possibilities. There seem to be questions that need answering
> and
> > >> motivating use cases - do we need sorted map state or can we solve
> our use
> > >> cases by something simpler - e.g. the mentioned TimeSortedBagState?
> Does
> > >> that really have to be time-sorted structure, or does it "only" have
> to
> > >> have operations that can efficiently find and remove element with
> smallest
> > >> timestamp (like a PriorityQueue)?
> > >>
> > >> Jan
> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
> > >>
> > >> Zooming in from generic philosophy to be clear: adding time ordered
> > >> buffer to the Fn state API is *not* a shortcut.It has benefits that
> will
> > >> not be achieved by SDK-side implementation on top of either ordered or
> > >> unordered multimap. Are those benefits worth expanding the API? I
> don't
> > >> know.
> > >>
> > >> A change to allow a runner to have a specialized implementation for
> > >> time-buffered state would be one or more StateKey types, right?
> Reuven,
> > >> maybe put this and your Java API in a doc? A BIP? Seems like there's
> at
> > >> least the following to explore:
> > >>
> > >>  - how that Java API would map to an SDK-side implementation on top of
> > >> multimap state key
> > >>  - how that Java API would map to a new StateKey
> > >>  - whether there's actually more than one relevant implementation of
> that
> > >> StateKey
> > >>  - whether SDK-side implementation on some other state key would be
> > >> performant enough in all SDK languages (present and future)
> > >>
> > >> Zooming back out to generic philosophy: Proliferation of StateKey
> > >> types tuned by runners (which can very easily still share
> implementation)
> > >> is probably better than proliferation of complex SDK-side
> implementations
> > >> with varying completeness and performance.
> > >>
> > >> Kenn
> > >>
> > >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com> wrote:
> > >>
> > >>> It might help for me to describe what I have in mind. I'm still
> > >>> proposing that we build multimap, just not a globally-sorted
> multimap.
> > >>>
> > >>> My previous proposal was that we provide a Multimap<Key, Value> state
> > >>> type, sorted by key. this would have two additional operations -
> > >>> multimap.getRange(startKey, endKey) and
> multimap.deleteRange(startKey,
> > >>> endKey). The primary use case was timestamp sorting, but I felt that
> a
> > >>> sorted multimap provided a nice generalization - after all, you can
> simply
> > >>> key the multimap by timestamp to get timestamp sorting.
> > >>>
> > >>> This approach had some issues immediately that would take some work
> to
> > >>> solve. Since a multimap key can have any type and a runner will only
> be
> > >>> able to sort by encoded type, we would need to introduce a concept of
> > >>> order-preserving coders into Beam and plumb that through. Robert
> pointed
> > >>> out that even our existing standard coders for simple integral types
> don't
> > >>> preserve order, so there will likely be surprises here.
> > >>>
> > >>> My current proposal is for a multimap that is not sorted by key, but
> > >>> that can support.ordered values for a single key. Remember that a
> multimap
> > >>> maps K -> Iterable<V>, so this means that each individual
> Iterable<V> is
> > >>> ordered, but the keys have no specific order relative to each other.
> This
> > >>> is not too different from many multimap implementations where the
> keys are
> > >>> unordered, but the list of values for a single key at least has a
> stable
> > >>> order.
> > >>>
> > >>> The interface would look like this:
> > >>>
> > >>> public interface MultimapState<K, V> extends State {
> > >>>   // Add a value with a default timestamp.
> > >>>   void put(K key, V value);
> > >>>
> > >>>   // Add a timestamped value.
> > >>>   void put(K, key, TimestampedValue<V> value);
> > >>>
> > >>>   // Remove all values for a key.
> > >>>   void remove (K key);
> > >>>
> > >>>   // Remove all values for a key with timestamps within the specified
> > >>> range.
> > >>>   void removeRange(K key, Instant startTs, Instant endTs);
> > >>>
> > >>>   // Get an Iterable of values for V. The Iterable will be returned
> > >>> sorted by timestamp.
> > >>>   ReadableState<Iterable<TimestampedValue<V>>> get(K key);
> > >>>
> > >>>   // Get an Iterable of values for V in the specified range. The
> > >>> Iterable will be returned sorted by timestamp.
> > >>>   ReadableState<Iterable<TimestampedValue<V>>> getRange(K key,
> Instant
> > >>> startTs, Instant endTs);
> > >>>
> > >>>   ReadableState<Iterable<K>> keys();
> > >>>   ReadableState<Iterable<TimestampedValue<V>>> values();
> > >>>   ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries;
> > >>> }
> > >>>
> > >>> We can of course provide helper functions that allow using
> MultimapState
> > >>> without deailing with TimestampValue for users who only want a
> multimap and
> > >>> don't want sorting.
> > >>>
> > >>> I think many users will only need a single sorted list - not a full
> > >>> multimap. It's worth offering this as well, and we can simply build
> it on
> > >>> top of MultimapState. It will look like an extension of BagState
> > >>>
> > >>> public interface TimestampSortedListState<T> extends State {
> > >>>   void add(TimestampedValue<T> value);
> > >>>   Iterable<TimestampedValue<T>> read();
> > >>>   Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
> > >>> endTs);
> > >>>   void clearRange(Instant startTs, Instant endTs);
> > >>> }
> > >>>
> > >>>
> > >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com> wrote:
> > >>>
> > >>>> The portability layer is meant to live across multiple versions of
> Beam
> > >>>> and I don't think it should be treated by doing the simple and
> useful thing
> > >>>> now since I believe it will lead to a proliferation of the API.
> > >>>>
> > >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <ke...@apache.org>
> > >>>> wrote:
> > >>>>
> > >>>>> I have thoughts on the subject of whether to have APIs just for the
> > >>>>> lowest-level building blocks versus having APIs for higher-level
> > >>>>> constructs. Specifically this applies to providing only unsorted
> multimap
> > >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to
> focus on
> > >>>>> time-ordered buffer; if it turns out to be easy to go all the way
> to sorted
> > >>>>> multimap that's nice-to-have; if it turns out to be easy to
> implement on
> > >>>>> top of unsorted map state that should probably be under the hood
> > >>>>>
> > >>>>> Reasons to build low-level multimap in the runner & fn api and
> layer
> > >>>>> higher-level things in the SDK:
> > >>>>>
> > >>>>>  - It is less implementation for runners if they have to only
> provide
> > >>>>> fewer lower-level building blocks like multimap state.
> > >>>>>  - There are many more runners than SDKs (and will be even more and
> > >>>>> more) so this saves overall.
> > >>>>>
> > >>>>> Reasons to build higher-level constructs directly in the runner
> and fn
> > >>>>> api:
> > >>>>>
> > >>>>>  - Having multiple higher-level state types may actually be less
> > >>>>> implementation than one complex state type, especially if they map
> to
> > >>>>> runner primitives.
> > >>>>>  - The runner may have better specialized implementations,
> especially
> > >>>>> for something like a time-ordered buffer.
> > >>>>>  - The particular access patterns in an SDK-based implementation
> may
> > >>>>> not be ideal for each runner's underlying implementation of the
> low-level
> > >>>>> building block.
> > >>>>>  - There may be excessive gRPC overhead even for optimal access
> > >>>>> patterns.
> > >>>>>
> > >>>>> There are ways to have best of both worlds, like:
> > >>>>>
> > >>>>> 1. Define multiple state types according to fundamental access
> > >>>>> patterns, like we did this before portability.
> > >>>>> 2. If it is easy to layer one on top of the other, do that inside
> the
> > >>>>> runner. Provide shared code so for runners providing the
> lowest-level
> > >>>>> primitive they get all the types for free.
> > >>>>>
> > >>>>> I understand that this is an oversimplification. It still creates
> some
> > >>>>> more work. And APIs are a burden so it is good to introduce as few
> as
> > >>>>> possible for maintenance. But it has performance benefits and also
> unblocks
> > >>>>> "just doing the simple and useful thing now" which I always like
> to do as
> > >>>>> long as it is compatible with future changes. If the APIs are
> fundamental,
> > >>>>> like sets, maps, timestamp ordering, then it is safe to guess that
> they
> > >>>>> will change rarely and be useful forever.
> > >>>>>
> > >>>>> Kenn
> > >>>>>
> > >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com>
> wrote:
> > >>>>>
> > >>>>>> I would be glad to take a stab at how to provide sorting on top of
> > >>>>>> unsorted multimap state.
> > >>>>>> Based upon your description, you want integer keys representing
> > >>>>>> timestamps and arbitrary user value for the values, is that
> correct?
> > >>>>>> What kinds of operations do you need on the sorted map state in
> order
> > >>>>>> of efficiency requirements?
> > >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)),
> ClearAll(Range[x, y))
> > >>>>>> What kinds of operations do we expect the underlying unsorted map
> > >>>>>> state to be able to provide?
> > >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
> > >>>>>> enumerate(K)?)
> > >>>>>>
> > >>>>>> I went through a similar exercise of how to provide a list like
> side
> > >>>>>> input view over a multimap[1] side input which efficiently allowed
> > >>>>>> computation of size and provided random access while only having
> access to
> > >>>>>> get(K) and enumerate K's.
> > >>>>>>
> > >>>>>> 1:
> > >>>>>>
> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
> > >>>>>>
> > >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com>
> wrote:
> > >>>>>>
> > >>>>>>> Bringing this subject up again,
> > >>>>>>>
> > >>>>>>> I've spent some time looking into implementing this for the
> Dataflow
> > >>>>>>> runner. I'm unable to find a way to implement the arbitrary
> sorted multimap
> > >>>>>>> efficiently for the case where there are large numbers of unique
> keys.
> > >>>>>>> Since the primary driving use case is timestamp ordering (i.e.
> key is event
> > >>>>>>> timestamp), you would expect to have nearly a new key per
> element. I
> > >>>>>>> considered Luke's suggestion above, but unfortunately it doesn't
> really
> > >>>>>>> solve this issue.
> > >>>>>>>
> > >>>>>>> The primary use case for sorting always seems to be sorting by
> > >>>>>>> timestamp. I want to propose that instead of building the
> fully-general
> > >>>>>>> sorted multimap, we instead focus on a state type where the sort
> key is an
> > >>>>>>> integral type (like a timestamp or an integer). There is still a
> valid use
> > >>>>>>> case for multimap, but we can provide that as an unordered
> state. At least
> > >>>>>>> for Dataflow, it will be much easier
> > >>>>>>>
> > >>>>>>> While my difficulties here may be specific to the Dataflow
> runner,
> > >>>>>>> any such support would have to be built into other runners as
> well, and
> > >>>>>>> limiting to integral sorting likely makes it easier for other
> runners to
> > >>>>>>> implement this. Also, if you look at this
> > >>>>>>> <
> https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
> Flink
> > >>>>>>> comment pointed out by Aljoscha, for Flink the main use case
> identified was
> > >>>>>>> also timestamp sorting. This will also simplify the API design
> for this
> > >>>>>>> feature: Sorted multimap with arbitrary keys would require us to
> introduce
> > >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a new
> > >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral
> types, the API
> > >>>>>>> design is simpler as integral types can be represented directly.
> > >>>>>>>
> > >>>>>>> Reuven
> > >>>>>>>
> > >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com>
> wrote:
> > >>>>>>>
> > >>>>>>>> This sounds to me like a potential runner strategy. However if a
> > >>>>>>>> runner can natively support sorted maps (e.g. we expect the
> Dataflow runner
> > >>>>>>>> to be able to do so, and I think it would be useful for other
> runners as
> > >>>>>>>> well), then it's probably preferable to allow the runner to use
> its native
> > >>>>>>>> capabilities.
> > >>>>>>>>
> > >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <lc...@google.com>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> For the API that you proposed, the map key is always "void" and
> > >>>>>>>>> the sort key == user key. So in my example of
> > >>>>>>>>> key: dummy value
> > >>>>>>>>> key.000: token, (0001, value4)
> > >>>>>>>>> key.001: token, (0010, value1), (0011, value2)
> > >>>>>>>>> key.01: token
> > >>>>>>>>> key.1: token, (1011, value3)
> > >>>>>>>>> you would have:
> > >>>>>>>>> "void": dummy value
> > >>>>>>>>> "void".000: token, (0001, value4)
> > >>>>>>>>> "void".001: token, (0010, value1), (0011, value2)
> > >>>>>>>>> "void".01: token
> > >>>>>>>>> "void".1: token, (1011, value3)
> > >>>>>>>>>
> > >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into
> walking
> > >>>>>>>>> the the prefixes until you find a common prefix for K and then
> filter for
> > >>>>>>>>> values where they have a sort key <= K. Using the example
> above, to find
> > >>>>>>>>> entriesUntil(0010) you would:
> > >>>>>>>>> look for key."", miss
> > >>>>>>>>> look for key.0, miss
> > >>>>>>>>> look for key.00, miss
> > >>>>>>>>> look for key.000, hit, sort all contained values using
> secondary
> > >>>>>>>>> key, provide value4 to user
> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so
> we
> > >>>>>>>>> sort all contained values using secondary key, filter out
> value2 and
> > >>>>>>>>> provide value1
> > >>>>>>>>>
> > >>>>>>>>> void removeUntil(K limit) also translates into walking the
> > >>>>>>>>> prefixes but instead we will clear them when we have a "hit"
> with some
> > >>>>>>>>> special logic for when the sort key is a prefix of the key.
> Used the
> > >>>>>>>>> example, to removeUntil(0010) you would:
> > >>>>>>>>> look for key."", miss
> > >>>>>>>>> look for key.0, miss
> > >>>>>>>>> look for key.00, miss
> > >>>>>>>>> look for key.000, hit, clear
> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so
> we
> > >>>>>>>>> sort all contained values using secondary key, store in memory
> all values
> > >>>>>>>>> that > 0010, clear and append values stored in memory.
> > >>>>>>>>>
> > >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <re...@google.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Can you explain how fetching and deleting ranges of keys would
> > >>>>>>>>>> work with this data structure?
> > >>>>>>>>>>
> > >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <lcwik@google.com
> >
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Reuven, for the example, I assume that we never want to store
> > >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we do
> then we will
> > >>>>>>>>>>> create a new longer prefix splitting up the values based
> upon the sort key.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Tuple representation in examples below is (key, sort key,
> value)
> > >>>>>>>>>>> and . is a character outside of the alphabet which can be
> represented by
> > >>>>>>>>>>> using an escaping encoding that wraps the key + sort key
> encoding.
> > >>>>>>>>>>>
> > >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the
> > >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this case
> its 0, so we
> > >>>>>>>>>>> append value to the map at key.0 ending up with (we also set
> the key to any
> > >>>>>>>>>>> dummy value to know that it it contains values):
> > >>>>>>>>>>> key: dummy value
> > >>>>>>>>>>> key."": token, (0010, value1)
> > >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" +
> all
> > >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to
> key."" ending up
> > >>>>>>>>>>> with:
> > >>>>>>>>>>> key: dummy value
> > >>>>>>>>>>> key."": token, (0010, value1), (0011, value2)
> > >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" +
> all
> > >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is full,
> so we partition
> > >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear the
> "" prefix
> > >>>>>>>>>>> ending up with:
> > >>>>>>>>>>> key: dummy value
> > >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2)
> > >>>>>>>>>>> key.1: token, (1011, value3)
> > >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" +
> all
> > >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is
> full, so we
> > >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but
> notice this
> > >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00
> again to 000, 001.
> > >>>>>>>>>>> We also clear the 0 prefix ending up with:
> > >>>>>>>>>>> key: dummy value
> > >>>>>>>>>>> key.000: token, (0001, value4)
> > >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2)
> > >>>>>>>>>>> key.01: token
> > >>>>>>>>>>> key.1: token, (1011, value3)
> > >>>>>>>>>>>
> > >>>>>>>>>>> We are effectively building a trie[1] where we only have
> values
> > >>>>>>>>>>> at the leaves and control how full each leaf can be. There
> are other trie
> > >>>>>>>>>>> representations like a radix tree that may be better.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Looking up the values in sorted order for "key" would go like
> > >>>>>>>>>>> this:
> > >>>>>>>>>>> Is key set, yes
> > >>>>>>>>>>> look for key."", miss
> > >>>>>>>>>>> look for key.0, miss
> > >>>>>>>>>>> look for key.00, miss
> > >>>>>>>>>>> look for key.000, hit, sort all contained values using
> secondary
> > >>>>>>>>>>> key, provide value4 to user
> > >>>>>>>>>>> look for key.001, hit, sort all contained values using
> secondary
> > >>>>>>>>>>> key, provide value1 followed by value2 to user
> > >>>>>>>>>>> look for key.01, hit, empty, return no values to user
> > >>>>>>>>>>> look for key.1, hit, sort all contained values using
> secondary
> > >>>>>>>>>>> key, provide value3 to user
> > >>>>>>>>>>> we have walked the entire prefix space, signal end of
> iterable
> > >>>>>>>>>>>
> > >>>>>>>>>>> Some notes for the above:
> > >>>>>>>>>>> * The dummy value is used to know that the key contains
> values
> > >>>>>>>>>>> and the token is to know whether there are any values deeper
> in the trie so
> > >>>>>>>>>>> when we know when to stop searching.
> > >>>>>>>>>>> * If we can recalculate the sort key from the combination of
> the
> > >>>>>>>>>>> key and value, then we don't need to store it.
> > >>>>>>>>>>> * Keys with lots of values will perform worse then keys with
> > >>>>>>>>>>> less values since we have to look up more keys but they will
> be empty
> > >>>>>>>>>>> reads. The number of misses can be controlled by how many
> elements we are
> > >>>>>>>>>>> willing to store at a given node before we subdivide.
> > >>>>>>>>>>>
> > >>>>>>>>>>> In reality you could build a lot of structures (e.g. red
> black
> > >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the cost
> of
> > >>>>>>>>>>> rebalancing/re-organizing the structure in map form and
> whether it has a
> > >>>>>>>>>>> convenient pre-order traversal for lookups.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <relax@google.com
> >
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Some great comments!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by
> > >>>>>>>>>>>> runners to be efficient. We can of course provide a default
> (inefficient)
> > >>>>>>>>>>>> implementation, but ideally runners would provide better
> ones.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by
> > >>>>>>>>>>>> this. E.g.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying
> > >>>>>>>>>>>> this. That's why I suggested that we provide a way to tag
> the coders that
> > >>>>>>>>>>>> do preserve order, and only accept those as key coders
> Alternatively we
> > >>>>>>>>>>>> could present a more limited API - e.g. only allowing a
> hard-coded set of
> > >>>>>>>>>>>> types to be used as keys - but that seems counter to the
> direction Beam
> > >>>>>>>>>>>> usually goes. So users will have two ways .of creating
> multimap state specs:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
> state =
> > >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(),
> StringUtf8Coder.of());
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> or
> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
> state =
> > >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(),
> StringUtf8Coder.of());
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The second one will validate that the key coder preserves
> > >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism
> checking in
> > >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these
> functions that use
> > >>>>>>>>>>>> coder inference to "guess" the coder, but those will do the
> same checking)
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Also the API I proposed did support random access! We could
> > >>>>>>>>>>>> separate out OrderedBagState again if we think the use
> cases are
> > >>>>>>>>>>>> fundamentally different. I merged the proposal into that of
> MultimapState
> > >>>>>>>>>>>> because there seemed be 99% overlap.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Reuven
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <
> > >>>>>>>>>>>> robertwb@google.com> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <
> relax@google.com>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>> >
> > >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <
> > >>>>>>>>>>>>> altay@google.com> wrote:
> > >>>>>>>>>>>>> >>
> > >>>>>>>>>>>>> >>
> > >>>>>>>>>>>>> >>
> > >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <
> > >>>>>>>>>>>>> lcwik@google.com> wrote:
> > >>>>>>>>>>>>> >>>
> > >>>>>>>>>>>>> >>>
> > >>>>>>>>>>>>> >>>
> > >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <
> > >>>>>>>>>>>>> ruwang@google.com> wrote:
> > >>>>>>>>>>>>> >>>>>
> > >>>>>>>>>>>>> >>>>> A few obvious problems with this code:
> > >>>>>>>>>>>>> >>>>>   1. Removing the elements already processed from the
> > >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This
> is O(n^2) in the
> > >>>>>>>>>>>>> number of input trades.
> > >>>>>>>>>>>>> >>>>
> > >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade
> > >>>>>>>>>>>>> state?
> > >>>>>>>>>>>>> >>>>
> > >>>>>>>>>>>>> >>>>>
> > >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends
> State
> > >>>>>>>>>>>>> {
> > >>>>>>>>>>>>> >>>>>   // Add a value to the map.
> > >>>>>>>>>>>>> >>>>>   void put(K key, V value);
> > >>>>>>>>>>>>> >>>>>   // Get all values for a given key.
> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
> > >>>>>>>>>>>>> >>>>>  // Return all entries in the map.
> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
> > >>>>>>>>>>>>> >>>>>   // Return all entries in the map with keys <=
> limit.
> > >>>>>>>>>>>>> returned elements are sorted by the key.
> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K
> > >>>>>>>>>>>>> limit);
> > >>>>>>>>>>>>> >>>>>
> > >>>>>>>>>>>>> >>>>>  // Remove all values with the given key;
> > >>>>>>>>>>>>> >>>>>   void remove(K key);
> > >>>>>>>>>>>>> >>>>>  // Remove all entries in the map with keys <= limit.
> > >>>>>>>>>>>>> >>>>>   void removeUntil(K limit);
> > >>>>>>>>>>>>> >>>>
> > >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will
> remove
> > >>>>>>>>>>>>> all entries in the map with keys < limit.
> > >>>>>>>>>>>>> >>>>
> > >>>>>>>>>>>>> >>>>>
> > >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of the
> key.
> > >>>>>>>>>>>>> In order to make this easier for users, I propose that we
> introduce a new
> > >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this
> tag guarantees
> > >>>>>>>>>>>>> that the encoded value preserves the same ordering as the
> base Java type.
> > >>>>>>>>>>>>> >>>>
> > >>>>>>>>>>>>> >>>>
> > >>>>>>>>>>>>> >>>> Could you clarify what is  "encoded value preserves
> the
> > >>>>>>>>>>>>> same ordering as the base Java type"?
> > >>>>>>>>>>>>> >>>
> > >>>>>>>>>>>>> >>>
> > >>>>>>>>>>>>> >>> Lets say A and B represent two different instances of
> the
> > >>>>>>>>>>>>> same Java type like a double, then A < B (using the
> languages comparison
> > >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded
> versions are compared
> > >>>>>>>>>>>>> lexicographically)
> > >>>>>>>>>>>>> >>
> > >>>>>>>>>>>>> >>
> > >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < B
> iff
> > >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we support?
> What happens A,
> > >>>>>>>>>>>>> B sort differently in different languages?
> > >>>>>>>>>>>>> >
> > >>>>>>>>>>>>> >
> > >>>>>>>>>>>>> > That would have to be the property of the coder (which
> means
> > >>>>>>>>>>>>> that this property probably needs to be represented in the
> portability
> > >>>>>>>>>>>>> representation of the coder). I imagine the common use
> cases will be for
> > >>>>>>>>>>>>> simple coders like int, long, string, etc., which are
> likely to sort the
> > >>>>>>>>>>>>> same in most languages.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The standard coders for both double and integral types do
> not
> > >>>>>>>>>>>>> respect
> > >>>>>>>>>>>>> the natural ordering (consider negative values). KV coders
> > >>>>>>>>>>>>> violate the
> > >>>>>>>>>>>>> "natural" lexicographic ordering on components as well. I
> think
> > >>>>>>>>>>>>> implicitly sorting on encoded value would yield many
> > >>>>>>>>>>>>> surprises. (The
> > >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes
> > >>>>>>>>>>>>> (string?)-producing callable as a parameter of course).
> (As for
> > >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or something
> > >>>>>>>>>>>>> like
> > >>>>>>>>>>>>> that...rather than Map which tends to imply random access.)
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> >
>

Re: [External] Re: DISCUSS: Sorted MapState API

Posted by Tyson Hamilton <ty...@google.com>.
Hi Reuven,

I noticed that there was an implementation of the in-memory OrderedListState introduced [1]. Where can I find out more regarding the plan and design? Is there a design doc? I'd like to know more details about the implementation to see if it fits my use case. I was hoping it would have a remove(TimestampedValue<T> e) method.

Thanks,
-Tyson


[1]: https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41


On 2020/08/03 21:41:46, Catlyn Kong <ca...@yelp.com> wrote: 
> Hey folks,
> 
> Sry I'm late to this thread but this might be very helpful for the problem
> we're dealing with. Do we have a design doc or a jira ticket I can follow?
> 
> Cheers,
> Catlyn
> 
> On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <je...@seznam.cz> wrote:
> 
> > My questions were just an example. I fully agree there is a fundamental
> > need for a sorted state (of some form, and I also think this links to
> > efficient implementation of retrations) - I was reacting to Kenn's question
> > about BIP. This one would be pretty nice example why it would be good to
> > have such a "process" - not everything can be solved on ML and there are
> > fundamental decisions that might need a closer attention.
> > On 6/18/20 5:28 PM, Reuven Lax wrote:
> >
> > Jan - my proposal is exactly TimeSortedBagState (more accurately -
> > TimeSortedListState), though I went a bit further and also proposed a way
> > to have a dynamic number of tagged TimeSortedBagStates.
> >
> > You are correct that the runner doesn't really have to store the data time
> > sorted - what's actually needed is the ability to fetch and remove
> > timestamp ranges of data (though that does include fetching the entire
> > list); TimeOrderedState is probably a more accurate name then
> > TimeSortedState. I don't think we could get away with operations that only
> > act on the smallest timestamp, however we could limit the API to only being
> > able to fetch and remove prefixes of data (ordered by timestamp). However
> > if we support prefixes, we might as well support arbitrary subranges.
> >
> > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> >> Big +1 for a BIP, as this might really help clarify all the pros and cons
> >> of all possibilities. There seem to be questions that need answering and
> >> motivating use cases - do we need sorted map state or can we solve our use
> >> cases by something simpler - e.g. the mentioned TimeSortedBagState? Does
> >> that really have to be time-sorted structure, or does it "only" have to
> >> have operations that can efficiently find and remove element with smallest
> >> timestamp (like a PriorityQueue)?
> >>
> >> Jan
> >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
> >>
> >> Zooming in from generic philosophy to be clear: adding time ordered
> >> buffer to the Fn state API is *not* a shortcut.It has benefits that will
> >> not be achieved by SDK-side implementation on top of either ordered or
> >> unordered multimap. Are those benefits worth expanding the API? I don't
> >> know.
> >>
> >> A change to allow a runner to have a specialized implementation for
> >> time-buffered state would be one or more StateKey types, right? Reuven,
> >> maybe put this and your Java API in a doc? A BIP? Seems like there's at
> >> least the following to explore:
> >>
> >>  - how that Java API would map to an SDK-side implementation on top of
> >> multimap state key
> >>  - how that Java API would map to a new StateKey
> >>  - whether there's actually more than one relevant implementation of that
> >> StateKey
> >>  - whether SDK-side implementation on some other state key would be
> >> performant enough in all SDK languages (present and future)
> >>
> >> Zooming back out to generic philosophy: Proliferation of StateKey
> >> types tuned by runners (which can very easily still share implementation)
> >> is probably better than proliferation of complex SDK-side implementations
> >> with varying completeness and performance.
> >>
> >> Kenn
> >>
> >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com> wrote:
> >>
> >>> It might help for me to describe what I have in mind. I'm still
> >>> proposing that we build multimap, just not a globally-sorted multimap.
> >>>
> >>> My previous proposal was that we provide a Multimap<Key, Value> state
> >>> type, sorted by key. this would have two additional operations -
> >>> multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey,
> >>> endKey). The primary use case was timestamp sorting, but I felt that a
> >>> sorted multimap provided a nice generalization - after all, you can simply
> >>> key the multimap by timestamp to get timestamp sorting.
> >>>
> >>> This approach had some issues immediately that would take some work to
> >>> solve. Since a multimap key can have any type and a runner will only be
> >>> able to sort by encoded type, we would need to introduce a concept of
> >>> order-preserving coders into Beam and plumb that through. Robert pointed
> >>> out that even our existing standard coders for simple integral types don't
> >>> preserve order, so there will likely be surprises here.
> >>>
> >>> My current proposal is for a multimap that is not sorted by key, but
> >>> that can support.ordered values for a single key. Remember that a multimap
> >>> maps K -> Iterable<V>, so this means that each individual Iterable<V> is
> >>> ordered, but the keys have no specific order relative to each other. This
> >>> is not too different from many multimap implementations where the keys are
> >>> unordered, but the list of values for a single key at least has a stable
> >>> order.
> >>>
> >>> The interface would look like this:
> >>>
> >>> public interface MultimapState<K, V> extends State {
> >>>   // Add a value with a default timestamp.
> >>>   void put(K key, V value);
> >>>
> >>>   // Add a timestamped value.
> >>>   void put(K, key, TimestampedValue<V> value);
> >>>
> >>>   // Remove all values for a key.
> >>>   void remove (K key);
> >>>
> >>>   // Remove all values for a key with timestamps within the specified
> >>> range.
> >>>   void removeRange(K key, Instant startTs, Instant endTs);
> >>>
> >>>   // Get an Iterable of values for V. The Iterable will be returned
> >>> sorted by timestamp.
> >>>   ReadableState<Iterable<TimestampedValue<V>>> get(K key);
> >>>
> >>>   // Get an Iterable of values for V in the specified range. The
> >>> Iterable will be returned sorted by timestamp.
> >>>   ReadableState<Iterable<TimestampedValue<V>>> getRange(K key, Instant
> >>> startTs, Instant endTs);
> >>>
> >>>   ReadableState<Iterable<K>> keys();
> >>>   ReadableState<Iterable<TimestampedValue<V>>> values();
> >>>   ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries;
> >>> }
> >>>
> >>> We can of course provide helper functions that allow using MultimapState
> >>> without deailing with TimestampValue for users who only want a multimap and
> >>> don't want sorting.
> >>>
> >>> I think many users will only need a single sorted list - not a full
> >>> multimap. It's worth offering this as well, and we can simply build it on
> >>> top of MultimapState. It will look like an extension of BagState
> >>>
> >>> public interface TimestampSortedListState<T> extends State {
> >>>   void add(TimestampedValue<T> value);
> >>>   Iterable<TimestampedValue<T>> read();
> >>>   Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
> >>> endTs);
> >>>   void clearRange(Instant startTs, Instant endTs);
> >>> }
> >>>
> >>>
> >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com> wrote:
> >>>
> >>>> The portability layer is meant to live across multiple versions of Beam
> >>>> and I don't think it should be treated by doing the simple and useful thing
> >>>> now since I believe it will lead to a proliferation of the API.
> >>>>
> >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <ke...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> I have thoughts on the subject of whether to have APIs just for the
> >>>>> lowest-level building blocks versus having APIs for higher-level
> >>>>> constructs. Specifically this applies to providing only unsorted multimap
> >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on
> >>>>> time-ordered buffer; if it turns out to be easy to go all the way to sorted
> >>>>> multimap that's nice-to-have; if it turns out to be easy to implement on
> >>>>> top of unsorted map state that should probably be under the hood
> >>>>>
> >>>>> Reasons to build low-level multimap in the runner & fn api and layer
> >>>>> higher-level things in the SDK:
> >>>>>
> >>>>>  - It is less implementation for runners if they have to only provide
> >>>>> fewer lower-level building blocks like multimap state.
> >>>>>  - There are many more runners than SDKs (and will be even more and
> >>>>> more) so this saves overall.
> >>>>>
> >>>>> Reasons to build higher-level constructs directly in the runner and fn
> >>>>> api:
> >>>>>
> >>>>>  - Having multiple higher-level state types may actually be less
> >>>>> implementation than one complex state type, especially if they map to
> >>>>> runner primitives.
> >>>>>  - The runner may have better specialized implementations, especially
> >>>>> for something like a time-ordered buffer.
> >>>>>  - The particular access patterns in an SDK-based implementation may
> >>>>> not be ideal for each runner's underlying implementation of the low-level
> >>>>> building block.
> >>>>>  - There may be excessive gRPC overhead even for optimal access
> >>>>> patterns.
> >>>>>
> >>>>> There are ways to have best of both worlds, like:
> >>>>>
> >>>>> 1. Define multiple state types according to fundamental access
> >>>>> patterns, like we did this before portability.
> >>>>> 2. If it is easy to layer one on top of the other, do that inside the
> >>>>> runner. Provide shared code so for runners providing the lowest-level
> >>>>> primitive they get all the types for free.
> >>>>>
> >>>>> I understand that this is an oversimplification. It still creates some
> >>>>> more work. And APIs are a burden so it is good to introduce as few as
> >>>>> possible for maintenance. But it has performance benefits and also unblocks
> >>>>> "just doing the simple and useful thing now" which I always like to do as
> >>>>> long as it is compatible with future changes. If the APIs are fundamental,
> >>>>> like sets, maps, timestamp ordering, then it is safe to guess that they
> >>>>> will change rarely and be useful forever.
> >>>>>
> >>>>> Kenn
> >>>>>
> >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com> wrote:
> >>>>>
> >>>>>> I would be glad to take a stab at how to provide sorting on top of
> >>>>>> unsorted multimap state.
> >>>>>> Based upon your description, you want integer keys representing
> >>>>>> timestamps and arbitrary user value for the values, is that correct?
> >>>>>> What kinds of operations do you need on the sorted map state in order
> >>>>>> of efficiency requirements?
> >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y))
> >>>>>> What kinds of operations do we expect the underlying unsorted map
> >>>>>> state to be able to provide?
> >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
> >>>>>> enumerate(K)?)
> >>>>>>
> >>>>>> I went through a similar exercise of how to provide a list like side
> >>>>>> input view over a multimap[1] side input which efficiently allowed
> >>>>>> computation of size and provided random access while only having access to
> >>>>>> get(K) and enumerate K's.
> >>>>>>
> >>>>>> 1:
> >>>>>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
> >>>>>>
> >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com> wrote:
> >>>>>>
> >>>>>>> Bringing this subject up again,
> >>>>>>>
> >>>>>>> I've spent some time looking into implementing this for the Dataflow
> >>>>>>> runner. I'm unable to find a way to implement the arbitrary sorted multimap
> >>>>>>> efficiently for the case where there are large numbers of unique keys.
> >>>>>>> Since the primary driving use case is timestamp ordering (i.e. key is event
> >>>>>>> timestamp), you would expect to have nearly a new key per element. I
> >>>>>>> considered Luke's suggestion above, but unfortunately it doesn't really
> >>>>>>> solve this issue.
> >>>>>>>
> >>>>>>> The primary use case for sorting always seems to be sorting by
> >>>>>>> timestamp. I want to propose that instead of building the fully-general
> >>>>>>> sorted multimap, we instead focus on a state type where the sort key is an
> >>>>>>> integral type (like a timestamp or an integer). There is still a valid use
> >>>>>>> case for multimap, but we can provide that as an unordered state. At least
> >>>>>>> for Dataflow, it will be much easier
> >>>>>>>
> >>>>>>> While my difficulties here may be specific to the Dataflow runner,
> >>>>>>> any such support would have to be built into other runners as well, and
> >>>>>>> limiting to integral sorting likely makes it easier for other runners to
> >>>>>>> implement this. Also, if you look at this
> >>>>>>> <https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95> Flink
> >>>>>>> comment pointed out by Aljoscha, for Flink the main use case identified was
> >>>>>>> also timestamp sorting. This will also simplify the API design for this
> >>>>>>> feature: Sorted multimap with arbitrary keys would require us to introduce
> >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a new
> >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral types, the API
> >>>>>>> design is simpler as integral types can be represented directly.
> >>>>>>>
> >>>>>>> Reuven
> >>>>>>>
> >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com> wrote:
> >>>>>>>
> >>>>>>>> This sounds to me like a potential runner strategy. However if a
> >>>>>>>> runner can natively support sorted maps (e.g. we expect the Dataflow runner
> >>>>>>>> to be able to do so, and I think it would be useful for other runners as
> >>>>>>>> well), then it's probably preferable to allow the runner to use its native
> >>>>>>>> capabilities.
> >>>>>>>>
> >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <lc...@google.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> For the API that you proposed, the map key is always "void" and
> >>>>>>>>> the sort key == user key. So in my example of
> >>>>>>>>> key: dummy value
> >>>>>>>>> key.000: token, (0001, value4)
> >>>>>>>>> key.001: token, (0010, value1), (0011, value2)
> >>>>>>>>> key.01: token
> >>>>>>>>> key.1: token, (1011, value3)
> >>>>>>>>> you would have:
> >>>>>>>>> "void": dummy value
> >>>>>>>>> "void".000: token, (0001, value4)
> >>>>>>>>> "void".001: token, (0010, value1), (0011, value2)
> >>>>>>>>> "void".01: token
> >>>>>>>>> "void".1: token, (1011, value3)
> >>>>>>>>>
> >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking
> >>>>>>>>> the the prefixes until you find a common prefix for K and then filter for
> >>>>>>>>> values where they have a sort key <= K. Using the example above, to find
> >>>>>>>>> entriesUntil(0010) you would:
> >>>>>>>>> look for key."", miss
> >>>>>>>>> look for key.0, miss
> >>>>>>>>> look for key.00, miss
> >>>>>>>>> look for key.000, hit, sort all contained values using secondary
> >>>>>>>>> key, provide value4 to user
> >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we
> >>>>>>>>> sort all contained values using secondary key, filter out value2 and
> >>>>>>>>> provide value1
> >>>>>>>>>
> >>>>>>>>> void removeUntil(K limit) also translates into walking the
> >>>>>>>>> prefixes but instead we will clear them when we have a "hit" with some
> >>>>>>>>> special logic for when the sort key is a prefix of the key. Used the
> >>>>>>>>> example, to removeUntil(0010) you would:
> >>>>>>>>> look for key."", miss
> >>>>>>>>> look for key.0, miss
> >>>>>>>>> look for key.00, miss
> >>>>>>>>> look for key.000, hit, clear
> >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we
> >>>>>>>>> sort all contained values using secondary key, store in memory all values
> >>>>>>>>> that > 0010, clear and append values stored in memory.
> >>>>>>>>>
> >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <re...@google.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Can you explain how fetching and deleting ranges of keys would
> >>>>>>>>>> work with this data structure?
> >>>>>>>>>>
> >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <lc...@google.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Reuven, for the example, I assume that we never want to store
> >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we do then we will
> >>>>>>>>>>> create a new longer prefix splitting up the values based upon the sort key.
> >>>>>>>>>>>
> >>>>>>>>>>> Tuple representation in examples below is (key, sort key, value)
> >>>>>>>>>>> and . is a character outside of the alphabet which can be represented by
> >>>>>>>>>>> using an escaping encoding that wraps the key + sort key encoding.
> >>>>>>>>>>>
> >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the
> >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this case its 0, so we
> >>>>>>>>>>> append value to the map at key.0 ending up with (we also set the key to any
> >>>>>>>>>>> dummy value to know that it it contains values):
> >>>>>>>>>>> key: dummy value
> >>>>>>>>>>> key."": token, (0010, value1)
> >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + all
> >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to key."" ending up
> >>>>>>>>>>> with:
> >>>>>>>>>>> key: dummy value
> >>>>>>>>>>> key."": token, (0010, value1), (0011, value2)
> >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + all
> >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is full, so we partition
> >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear the "" prefix
> >>>>>>>>>>> ending up with:
> >>>>>>>>>>> key: dummy value
> >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2)
> >>>>>>>>>>> key.1: token, (1011, value3)
> >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + all
> >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is full, so we
> >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but notice this
> >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00 again to 000, 001.
> >>>>>>>>>>> We also clear the 0 prefix ending up with:
> >>>>>>>>>>> key: dummy value
> >>>>>>>>>>> key.000: token, (0001, value4)
> >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2)
> >>>>>>>>>>> key.01: token
> >>>>>>>>>>> key.1: token, (1011, value3)
> >>>>>>>>>>>
> >>>>>>>>>>> We are effectively building a trie[1] where we only have values
> >>>>>>>>>>> at the leaves and control how full each leaf can be. There are other trie
> >>>>>>>>>>> representations like a radix tree that may be better.
> >>>>>>>>>>>
> >>>>>>>>>>> Looking up the values in sorted order for "key" would go like
> >>>>>>>>>>> this:
> >>>>>>>>>>> Is key set, yes
> >>>>>>>>>>> look for key."", miss
> >>>>>>>>>>> look for key.0, miss
> >>>>>>>>>>> look for key.00, miss
> >>>>>>>>>>> look for key.000, hit, sort all contained values using secondary
> >>>>>>>>>>> key, provide value4 to user
> >>>>>>>>>>> look for key.001, hit, sort all contained values using secondary
> >>>>>>>>>>> key, provide value1 followed by value2 to user
> >>>>>>>>>>> look for key.01, hit, empty, return no values to user
> >>>>>>>>>>> look for key.1, hit, sort all contained values using secondary
> >>>>>>>>>>> key, provide value3 to user
> >>>>>>>>>>> we have walked the entire prefix space, signal end of iterable
> >>>>>>>>>>>
> >>>>>>>>>>> Some notes for the above:
> >>>>>>>>>>> * The dummy value is used to know that the key contains values
> >>>>>>>>>>> and the token is to know whether there are any values deeper in the trie so
> >>>>>>>>>>> when we know when to stop searching.
> >>>>>>>>>>> * If we can recalculate the sort key from the combination of the
> >>>>>>>>>>> key and value, then we don't need to store it.
> >>>>>>>>>>> * Keys with lots of values will perform worse then keys with
> >>>>>>>>>>> less values since we have to look up more keys but they will be empty
> >>>>>>>>>>> reads. The number of misses can be controlled by how many elements we are
> >>>>>>>>>>> willing to store at a given node before we subdivide.
> >>>>>>>>>>>
> >>>>>>>>>>> In reality you could build a lot of structures (e.g. red black
> >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the cost of
> >>>>>>>>>>> rebalancing/re-organizing the structure in map form and whether it has a
> >>>>>>>>>>> convenient pre-order traversal for lookups.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <re...@google.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Some great comments!
> >>>>>>>>>>>>
> >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by
> >>>>>>>>>>>> runners to be efficient. We can of course provide a default (inefficient)
> >>>>>>>>>>>> implementation, but ideally runners would provide better ones.
> >>>>>>>>>>>>
> >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by
> >>>>>>>>>>>> this. E.g.
> >>>>>>>>>>>>
> >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying
> >>>>>>>>>>>> this. That's why I suggested that we provide a way to tag the coders that
> >>>>>>>>>>>> do preserve order, and only accept those as key coders Alternatively we
> >>>>>>>>>>>> could present a more limited API - e.g. only allowing a hard-coded set of
> >>>>>>>>>>>> types to be used as keys - but that seems counter to the direction Beam
> >>>>>>>>>>>> usually goes. So users will have two ways .of creating multimap state specs:
> >>>>>>>>>>>>
> >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>> state =
> >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
> >>>>>>>>>>>>
> >>>>>>>>>>>> or
> >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>> state =
> >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());
> >>>>>>>>>>>>
> >>>>>>>>>>>> The second one will validate that the key coder preserves
> >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism checking in
> >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these functions that use
> >>>>>>>>>>>> coder inference to "guess" the coder, but those will do the same checking)
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also the API I proposed did support random access! We could
> >>>>>>>>>>>> separate out OrderedBagState again if we think the use cases are
> >>>>>>>>>>>> fundamentally different. I merged the proposal into that of MultimapState
> >>>>>>>>>>>> because there seemed be 99% overlap.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Reuven
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <
> >>>>>>>>>>>> robertwb@google.com> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <re...@google.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>> >
> >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <
> >>>>>>>>>>>>> altay@google.com> wrote:
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <
> >>>>>>>>>>>>> lcwik@google.com> wrote:
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <
> >>>>>>>>>>>>> ruwang@google.com> wrote:
> >>>>>>>>>>>>> >>>>>
> >>>>>>>>>>>>> >>>>> A few obvious problems with this code:
> >>>>>>>>>>>>> >>>>>   1. Removing the elements already processed from the
> >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This is O(n^2) in the
> >>>>>>>>>>>>> number of input trades.
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade
> >>>>>>>>>>>>> state?
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>>>
> >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State
> >>>>>>>>>>>>> {
> >>>>>>>>>>>>> >>>>>   // Add a value to the map.
> >>>>>>>>>>>>> >>>>>   void put(K key, V value);
> >>>>>>>>>>>>> >>>>>   // Get all values for a given key.
> >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
> >>>>>>>>>>>>> >>>>>  // Return all entries in the map.
> >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
> >>>>>>>>>>>>> >>>>>   // Return all entries in the map with keys <= limit.
> >>>>>>>>>>>>> returned elements are sorted by the key.
> >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K
> >>>>>>>>>>>>> limit);
> >>>>>>>>>>>>> >>>>>
> >>>>>>>>>>>>> >>>>>  // Remove all values with the given key;
> >>>>>>>>>>>>> >>>>>   void remove(K key);
> >>>>>>>>>>>>> >>>>>  // Remove all entries in the map with keys <= limit.
> >>>>>>>>>>>>> >>>>>   void removeUntil(K limit);
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove
> >>>>>>>>>>>>> all entries in the map with keys < limit.
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>>>
> >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of the key.
> >>>>>>>>>>>>> In order to make this easier for users, I propose that we introduce a new
> >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this tag guarantees
> >>>>>>>>>>>>> that the encoded value preserves the same ordering as the base Java type.
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>> Could you clarify what is  "encoded value preserves the
> >>>>>>>>>>>>> same ordering as the base Java type"?
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>> Lets say A and B represent two different instances of the
> >>>>>>>>>>>>> same Java type like a double, then A < B (using the languages comparison
> >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions are compared
> >>>>>>>>>>>>> lexicographically)
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff
> >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we support? What happens A,
> >>>>>>>>>>>>> B sort differently in different languages?
> >>>>>>>>>>>>> >
> >>>>>>>>>>>>> >
> >>>>>>>>>>>>> > That would have to be the property of the coder (which means
> >>>>>>>>>>>>> that this property probably needs to be represented in the portability
> >>>>>>>>>>>>> representation of the coder). I imagine the common use cases will be for
> >>>>>>>>>>>>> simple coders like int, long, string, etc., which are likely to sort the
> >>>>>>>>>>>>> same in most languages.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The standard coders for both double and integral types do not
> >>>>>>>>>>>>> respect
> >>>>>>>>>>>>> the natural ordering (consider negative values). KV coders
> >>>>>>>>>>>>> violate the
> >>>>>>>>>>>>> "natural" lexicographic ordering on components as well. I think
> >>>>>>>>>>>>> implicitly sorting on encoded value would yield many
> >>>>>>>>>>>>> surprises. (The
> >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes
> >>>>>>>>>>>>> (string?)-producing callable as a parameter of course). (As for
> >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or something
> >>>>>>>>>>>>> like
> >>>>>>>>>>>>> that...rather than Map which tends to imply random access.)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
>