You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com> on 2018/11/28 20:01:06 UTC

Handling large values

There is a discussion happening on a PR 7127[1] where Robert is working on
providing the first implementation for supporting large iterables resulting
from a GroupByKey. This is inline with the original proposal for remote
references over the Fn Data & State API[2].

I had thought about this issue more since the original write up was done
over a year ago and believe that we can simplify the implementation by
migrating the length prefix coder to be able to embed a remote reference
token at the end of the stream if the data is too large. This allows any
coder which supports lazy decoding to return a view over a seekable stream
instead of decoding all the data (regardless whether all the data was sent
or there is a state token representing the remote reference).

Allowing any arbitrary coder to support lazy decoding helps solve the large
iterable use case but also opens up the ability for types which don't need
to be fully decoded to provide lazy views. Imagine our Beam rows using a
format where only rows that are read are decoded while everything else is
left in its encoded form.

I also originally thought that this could also help solve an issue where
large values[3] need to be chunked across multiple protobuf messages over
the Data API which complicates the reading side decoding implementation
since each SDK needs to provide an implementation that blocks and waits for
the next chunk to come across for the same logical stream[4]. But there are
issues with this because the runner may make a bad coder choice such
as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>)
which can lead to > 2gb of state keys if there are many many values.

Robert, would implementing the length prefix coder being backed by state +
adding a lazy decoding method to the iterable coder be significantly more
complicated then what you are proposing right now?

What do others think about coders supporting a "lazy" decode mode in coders?

1: https://github.com/apache/beam/pull/7127
2:
https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
3:
https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
4:
https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf

Re: Handling large values

Posted by Lukasz Cwik <lc...@google.com>.
On Thu, Nov 29, 2018 at 3:01 PM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Nov 29, 2018 at 7:08 PM Lukasz Cwik <lc...@google.com> wrote:
> >
> > On Thu, Nov 29, 2018 at 7:13 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Thu, Nov 29, 2018 at 2:18 AM Lukasz Cwik <lc...@google.com> wrote:
> >> >
> >> > I don't believe we would need to change any other coders since
> SeekableInputStream wouldn't change how a regular InputStream would work so
> coders that don't care about the implementation would still use it as a
> forward only input stream. Coders that care about seeking would use the new
> functionality.
> >>
> >> An API could be developed that makes this work, but the proposal of
> >>
> >> class SmartCoder<T> {
> >>   public T decode(InputStream is) {
> >>     if (is instanceof SeekableInputStream) {
> >>       return view((SeekableInputStream) is);
> >>     }
> >>     return decodeInternal(is);
> >>   }
> >> }
> >>
> >> would break it passed to (just as an example) the unmodified KV coder
> >>
> >> class KvCoder<K, V> {
> >>   public Kv<K, V> decode(InputStream is) {
> >>     return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
> >>   }
> >> }
> >>
> >> when invoked with an InputStream that happens to be a
> >> SeekableInputStream if either keyCoder or valueCoder were instances of
> >> SmartCoder unless SmartCoder.view() did something really clever about
> >> advancing the provided stream the right amount without actually
> >> consuming it. This is particularly expensive/tricky for iterables
> >> where it's most useful.
> >
> >
> > Thanks for walking through this with me Robert.
> >
> > The issue is that the view needs to advance the stream if it wants to
> decode the components separately, this works naturally for the iterable
> coder since all decoding is done in order so that advances the stream
> automatically and for any component coder where it also supports being a
> view. For any coder that isn't advancing the stream in order has to have an
> index as part of its encoding. Using the KV coder as the example, the two
> strategies would be as follows:
> >
> > decode method is the same for both strategies
> > public KV<K, V> decode(InputStream is) {
> >   if (is instanceof SeekableInputStream) {
> >     return KVView((SeekableInputStream) is, keyCoder, valueCoder);
> >   }
> >   return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
> > }
> >
> > forward only view decoding:
> > class KVView<K, V> extends KV<K, V> {
> >   K getKey() {
> >     if (!keyDecoded) {
> >       key = keyCoder.decode(is);
> >     }
> >     return key;
> >   }
> >
> >   V getValue() {
> >     // ensures the input stream has advanced to the value position
> >     getKey();
> >
> >     if (!valueDecoded) {
> >      value = valueCoder.decode(is);
> >     }
> >     return value;
> > }
> >
> > index based decoding:
> > class KVView<K, V> extends KV<K, V> {
> >   KVView(SeekableInputStream is, Coder<K> keyCoder, Coder<V> valueCoder)
> {
> >     valueOffset = readBigEndianInt(is);
> >     // ...
> >   }
> >   K getKey() {
> >     if (!keyDecoded) {
> >       is.seek(4);  // 4 bytes for big int index
> >       key = keyCoder.decode(is);
> >     }
> >     return key;
> >   }
> >
> >   V getValue() {
> >     if (!valueDecoded) {
> >      is.seek(valueOffset);
> >      value = valueCoder.decode(is);
> >     }
> >     return value;
> > }
> >
> > I believe for the KV case and the iterable case we will find that our
> coders are typically KV<LengthPrefix<Key>, LengthPrefix<Value>> and
> Iterable<LengthPrefix<Value>> which would mean that a smart coder could
> inspect the component coder and if its a length prefix coder, ask it to
> seek to the end of its value within the input stream which mean that a
> smart coder could understand the length of its components.
>
> I understand how KV coder can be made smart. My concern is the
> difficulty of having dumb coders with smart coder components. E.g.
> imagine a row coder
>
> class DumbRowCoder {
>   Row decode(InputStream is) {
>     List<Object> parts = ...
>     for (Coder c : componentCoders) {
>       // Smart coders *must* advance the inputs stream in case the
>       // subsequent coder is also dumb.
>       // Efficient seek will require more than continuation tokens
> over the FnAPI.
>       // Important ones like iterable are likely to be lazily written,
> and so won't know
>       // their length when they start encoding, but iterating it to
> discover the length
>       // defeats much of the goal of being lazy.
>       parts.add(c.decode(is));
>     }
>   }
> }
>

I agree that smart coders must advance the stream for interoperability with
dumb coders and also that efficient seek can't be built off of continuation
tokens.


> >> > For the encoding portion, the state backed length prefix coder would
> send the small snippet of data that it received plus the state key without
> invoking the component coder to encode the value. The downstream receiving
> party would need to lookup the remote reference to get all the data.
> >>
> >> I'm trying to follow what you're saying here. Are you restricting to
> >> the case of only encoding something that was formerly decoded with a
> >> state backed length prefix coder (and keeps the unencoded bytes
> >> around)?
> >
> > Yes.
> >
> >> It'd be good to support writing novel values lazily as well.
> >> Also, this brings up the issue of how to manage the lifetime of remote
> >> references if they can be extended in this way.
> >
> > I don't know of any data processing system that currently handles this
> and am unsure if this is a problem in practice for many people. If someone
> gets a giant string iterable and naively transforms it, for example by
> concatenating all the strings, their worker will crash.  Only solution I
> have seen is that people don't naively transform the value but instead pass
> around a view over the value that applies the transform as needed.
>
> I was thinking primarily of the case where the runner has such
> arbitrarily long iterables from a GBK and wants to encode them and
> send them to the SDK.
>

For some reason I was under the impression that you were trying to cover
the case where SDKs are trying to write out arbitrarily long iterables not
the Runner. For the Runner, it is meant to use the remote reference concept
to cover this scenario.


> > Extending remote references to also be able to be created by SDKs is an
> interesting idea that I had explored a tiny bit in the past but then
> dropped it due to time constraints and for the fact that we had much more
> immediate things we wanted to get implemented.
>
> I think we're still in that boat :).
>
> >> > All other coders would not be lazy and would have to encode the
> entire lazy view, this could be done by optimized by copying the
> SeekableInputStream to the OutputStream. Note that the length prefix coder
> is never used with IOs and hence those IOs could be given a type like
> Iterable<Foo> which is lazy, but the encoding for that wouldn't be lazy
> >>
> >> Yes, that's how it is now.
> >>
> >> > and would output all the data from the SeekableInputStream.
> >> >
> >> >
> >> > On Wed, Nov 28, 2018 at 3:08 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>
> >> >> On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >> >
> >> >> > Re-adding +datapls-portability-team@google.com +
> datapls-unified-worker@google.com
> >> >> >
> >> >> > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <
> robertwb@google.com> wrote:
> >> >> >>
> >> >> >> Thanks for bringing this to the list. More below.
> >> >> >>
> >> >> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >> >> >>>
> >> >> >>> FWIW I deliberately limited the thread to not mix public and
> private lists, so people intending private replies do not accidentally send
> to dev@beam.
> >> >> >>>
> >> >> >>> I've left them on this time, to avoid contradicting your action,
> but I recommend removing them.
> >> >> >>>
> >> >> >>> Kenn
> >> >> >>>
> >> >> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >> >>>>
> >> >> >>>> Re-adding +datapls-portability-team@google.com +
> datapls-unified-worker@google.com
> >> >> >>>>
> >> >> >>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >> >>>>>
> >> >> >>>>> That is correct Kenn. An important point would be that
> SomeOtherCoder would be given a seekable stream (instead of the forward
> only stream it gets right now) so it can either decode all the data or
> lazily decode parts as it needs to as in the case of an iterable coder when
> used to support large iterables coming out of a GroupByKey.
> >> >> >>>>>
> >> >> >>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <
> kenn@apache.org> wrote:
> >> >> >>>>>>
> >> >> >>>>>> Interesting! Having large iterables within rows would be
> great for the interactions between SQL and the core SDK's schema/Row
> support, and we weren't sure how that could work, exactly.
> >> >> >>>>>>
> >> >> >>>>>> My (very basic) understanding would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
> followed by the encoding of SomeOtherCoder.
> >> >> >>>>>>
> >> >> >>>>>> So the new proposal would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a length
> followed by some number of bytes and if it ends with a special token
> (ignoring escaping issues) then you have to gather bytes from more messages
> in order to assemble a stream to send to SomeOtherCoder? Have I got what
> you mean? So this is a different, yet compatible, approach to sending over
> a special token that has to be looked up separately via the state read API?
> >> >> >>>>>>
> >> >> >>>>>> Kenn
> >> >> >>>>>>
> >> >> >>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <
> lcwik@google.com> wrote:
> >> >> >>>>>>>
> >> >> >>>>>>> There is a discussion happening on a PR 7127[1] where Robert
> is working on providing the first implementation for supporting large
> iterables resulting from a GroupByKey. This is inline with the original
> proposal for remote references over the Fn Data & State API[2].
> >> >> >>>>>>>
> >> >> >>>>>>> I had thought about this issue more since the original write
> up was done over a year ago and believe that we can simplify the
> implementation by migrating the length prefix coder to be able to embed a
> remote reference token at the end of the stream if the data is too large.
> This allows any coder which supports lazy decoding to return a view over a
> seekable stream instead of decoding all the data (regardless whether all
> the data was sent or there is a state token representing the remote
> reference).
> >> >> >>>>>>>
> >> >> >>>>>>> Allowing any arbitrary coder to support lazy decoding helps
> solve the large iterable use case but also opens up the ability for types
> which don't need to be fully decoded to provide lazy views. Imagine our
> Beam rows using a format where only rows that are read are decoded while
> everything else is left in its encoded form.
> >> >> >>>>>>>
> >> >> >>>>>>> I also originally thought that this could also help solve an
> issue where large values[3] need to be chunked across multiple protobuf
> messages over the Data API which complicates the reading side decoding
> implementation since each SDK needs to provide an implementation that
> blocks and waits for the next chunk to come across for the same logical
> stream[4]. But there are issues with this because the runner may make a bad
> coder choice such as iterable<length_prefix<blob>> (instead of
> length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if
> there are many many values.
> >> >> >>
> >> >> >>
> >> >> >> Yes. I think this would need to be a separate coder than the
> length prefix coder.
> >> >> >>
> >> >> >>
> >> >> >>>>>>>
> >> >> >>>>>>> Robert, would implementing the length prefix coder being
> backed by state + adding a lazy decoding method to the iterable coder be
> significantly more complicated then what you are proposing right now?
> >> >> >>
> >> >> >>
> >> >> >> Yes, chopping things up at arbitrary byte boundaries (rather than
> element boundaries) tends to be significantly more subtle and complex
> (based on my experience with the data plane API). It would also require new
> public APIs for Coders.
> >> >> >
> >> >> >
> >> >> > After some further thought, I don't think we need to have a
> different API for coders, its just that they get a different implementation
> for the inputstream when decoding. So the logic would be:
> >> >> > public T decode(InputStream is) {
> >> >> >   if (is instanceof SeekableInputStream) {
> >> >> >     return view((SeekableInputStream) is);
> >> >> >   }
> >> >> >   return decodeInternal(is);
> >> >> > }
> >> >>
> >> >> SeekableInputStream is a new API. If we went this route of re-using
> >> >> decode, it'd be an easy bug to accidentally pass a
> SeekableInputStream
> >> >> to component coders which wouldn't do the right thing. (Perhaps all
> >> >> coders would have to be modified?) And encoding is less obvious (e.g.
> >> >> a subclass of OutputStream that takes a callback for the rest of the
> >> >> bytes? As chosen by the caller or the callee?).
> >> >>
> >> >> >> This is why I went with the more restricted (but still by far
> most common, and quite straightforward) case of supporting arbitrarily
> large iterables (which can still occur at any level of nesting, e.g. inside
> rows), leaving the general case as future work.
> >> >> >>
> >> >> >>
> >> >> >>>>>>>
> >> >> >>>>>>> What do others think about coders supporting a "lazy" decode
> mode in coders?
> >> >> >>>>>>>
> >> >> >>>>>>> 1: https://github.com/apache/beam/pull/7127
> >> >> >>>>>>> 2:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
> >> >> >>>>>>> 3:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
> >> >> >>>>>>> 4:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
> >> >> >
> >> >> > --
> >> >> > You received this message because you are subscribed to the Google
> Groups "DataPLS Unified Worker" group.
> >> >> > To unsubscribe from this group and stop receiving emails from it,
> send an email to datapls-unified-worker+unsubscribe@google.com.
> >> >> > To post to this group, send email to
> datapls-unified-worker@google.com.
> >> >> > To view this discussion on the web visit
> https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com
> .
>

Re: Handling large values

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Nov 29, 2018 at 7:08 PM Lukasz Cwik <lc...@google.com> wrote:
>
> On Thu, Nov 29, 2018 at 7:13 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Thu, Nov 29, 2018 at 2:18 AM Lukasz Cwik <lc...@google.com> wrote:
>> >
>> > I don't believe we would need to change any other coders since SeekableInputStream wouldn't change how a regular InputStream would work so coders that don't care about the implementation would still use it as a forward only input stream. Coders that care about seeking would use the new functionality.
>>
>> An API could be developed that makes this work, but the proposal of
>>
>> class SmartCoder<T> {
>>   public T decode(InputStream is) {
>>     if (is instanceof SeekableInputStream) {
>>       return view((SeekableInputStream) is);
>>     }
>>     return decodeInternal(is);
>>   }
>> }
>>
>> would break it passed to (just as an example) the unmodified KV coder
>>
>> class KvCoder<K, V> {
>>   public Kv<K, V> decode(InputStream is) {
>>     return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
>>   }
>> }
>>
>> when invoked with an InputStream that happens to be a
>> SeekableInputStream if either keyCoder or valueCoder were instances of
>> SmartCoder unless SmartCoder.view() did something really clever about
>> advancing the provided stream the right amount without actually
>> consuming it. This is particularly expensive/tricky for iterables
>> where it's most useful.
>
>
> Thanks for walking through this with me Robert.
>
> The issue is that the view needs to advance the stream if it wants to decode the components separately, this works naturally for the iterable coder since all decoding is done in order so that advances the stream automatically and for any component coder where it also supports being a view. For any coder that isn't advancing the stream in order has to have an index as part of its encoding. Using the KV coder as the example, the two strategies would be as follows:
>
> decode method is the same for both strategies
> public KV<K, V> decode(InputStream is) {
>   if (is instanceof SeekableInputStream) {
>     return KVView((SeekableInputStream) is, keyCoder, valueCoder);
>   }
>   return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
> }
>
> forward only view decoding:
> class KVView<K, V> extends KV<K, V> {
>   K getKey() {
>     if (!keyDecoded) {
>       key = keyCoder.decode(is);
>     }
>     return key;
>   }
>
>   V getValue() {
>     // ensures the input stream has advanced to the value position
>     getKey();
>
>     if (!valueDecoded) {
>      value = valueCoder.decode(is);
>     }
>     return value;
> }
>
> index based decoding:
> class KVView<K, V> extends KV<K, V> {
>   KVView(SeekableInputStream is, Coder<K> keyCoder, Coder<V> valueCoder) {
>     valueOffset = readBigEndianInt(is);
>     // ...
>   }
>   K getKey() {
>     if (!keyDecoded) {
>       is.seek(4);  // 4 bytes for big int index
>       key = keyCoder.decode(is);
>     }
>     return key;
>   }
>
>   V getValue() {
>     if (!valueDecoded) {
>      is.seek(valueOffset);
>      value = valueCoder.decode(is);
>     }
>     return value;
> }
>
> I believe for the KV case and the iterable case we will find that our coders are typically KV<LengthPrefix<Key>, LengthPrefix<Value>> and Iterable<LengthPrefix<Value>> which would mean that a smart coder could inspect the component coder and if its a length prefix coder, ask it to seek to the end of its value within the input stream which mean that a smart coder could understand the length of its components.

I understand how KV coder can be made smart. My concern is the
difficulty of having dumb coders with smart coder components. E.g.
imagine a row coder

class DumbRowCoder {
  Row decode(InputStream is) {
    List<Object> parts = ...
    for (Coder c : componentCoders) {
      // Smart coders *must* advance the inputs stream in case the
      // subsequent coder is also dumb.
      // Efficient seek will require more than continuation tokens
over the FnAPI.
      // Important ones like iterable are likely to be lazily written,
and so won't know
      // their length when they start encoding, but iterating it to
discover the length
      // defeats much of the goal of being lazy.
      parts.add(c.decode(is));
    }
  }
}

>> > For the encoding portion, the state backed length prefix coder would send the small snippet of data that it received plus the state key without invoking the component coder to encode the value. The downstream receiving party would need to lookup the remote reference to get all the data.
>>
>> I'm trying to follow what you're saying here. Are you restricting to
>> the case of only encoding something that was formerly decoded with a
>> state backed length prefix coder (and keeps the unencoded bytes
>> around)?
>
> Yes.
>
>> It'd be good to support writing novel values lazily as well.
>> Also, this brings up the issue of how to manage the lifetime of remote
>> references if they can be extended in this way.
>
> I don't know of any data processing system that currently handles this and am unsure if this is a problem in practice for many people. If someone gets a giant string iterable and naively transforms it, for example by concatenating all the strings, their worker will crash.  Only solution I have seen is that people don't naively transform the value but instead pass around a view over the value that applies the transform as needed.

I was thinking primarily of the case where the runner has such
arbitrarily long iterables from a GBK and wants to encode them and
send them to the SDK.

> Extending remote references to also be able to be created by SDKs is an interesting idea that I had explored a tiny bit in the past but then dropped it due to time constraints and for the fact that we had much more immediate things we wanted to get implemented.

I think we're still in that boat :).

>> > All other coders would not be lazy and would have to encode the entire lazy view, this could be done by optimized by copying the SeekableInputStream to the OutputStream. Note that the length prefix coder is never used with IOs and hence those IOs could be given a type like Iterable<Foo> which is lazy, but the encoding for that wouldn't be lazy
>>
>> Yes, that's how it is now.
>>
>> > and would output all the data from the SeekableInputStream.
>> >
>> >
>> > On Wed, Nov 28, 2018 at 3:08 PM Robert Bradshaw <ro...@google.com> wrote:
>> >>
>> >> On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <lc...@google.com> wrote:
>> >> >
>> >> > Re-adding +datapls-portability-team@google.com +datapls-unified-worker@google.com
>> >> >
>> >> > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <ro...@google.com> wrote:
>> >> >>
>> >> >> Thanks for bringing this to the list. More below.
>> >> >>
>> >> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <ke...@apache.org> wrote:
>> >> >>>
>> >> >>> FWIW I deliberately limited the thread to not mix public and private lists, so people intending private replies do not accidentally send to dev@beam.
>> >> >>>
>> >> >>> I've left them on this time, to avoid contradicting your action, but I recommend removing them.
>> >> >>>
>> >> >>> Kenn
>> >> >>>
>> >> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:
>> >> >>>>
>> >> >>>> Re-adding +datapls-portability-team@google.com +datapls-unified-worker@google.com
>> >> >>>>
>> >> >>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote:
>> >> >>>>>
>> >> >>>>> That is correct Kenn. An important point would be that SomeOtherCoder would be given a seekable stream (instead of the forward only stream it gets right now) so it can either decode all the data or lazily decode parts as it needs to as in the case of an iterable coder when used to support large iterables coming out of a GroupByKey.
>> >> >>>>>
>> >> >>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org> wrote:
>> >> >>>>>>
>> >> >>>>>> Interesting! Having large iterables within rows would be great for the interactions between SQL and the core SDK's schema/Row support, and we weren't sure how that could work, exactly.
>> >> >>>>>>
>> >> >>>>>> My (very basic) understanding would be that LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length followed by the encoding of SomeOtherCoder.
>> >> >>>>>>
>> >> >>>>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a length followed by some number of bytes and if it ends with a special token (ignoring escaping issues) then you have to gather bytes from more messages in order to assemble a stream to send to SomeOtherCoder? Have I got what you mean? So this is a different, yet compatible, approach to sending over a special token that has to be looked up separately via the state read API?
>> >> >>>>>>
>> >> >>>>>> Kenn
>> >> >>>>>>
>> >> >>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:
>> >> >>>>>>>
>> >> >>>>>>> There is a discussion happening on a PR 7127[1] where Robert is working on providing the first implementation for supporting large iterables resulting from a GroupByKey. This is inline with the original proposal for remote references over the Fn Data & State API[2].
>> >> >>>>>>>
>> >> >>>>>>> I had thought about this issue more since the original write up was done over a year ago and believe that we can simplify the implementation by migrating the length prefix coder to be able to embed a remote reference token at the end of the stream if the data is too large. This allows any coder which supports lazy decoding to return a view over a seekable stream instead of decoding all the data (regardless whether all the data was sent or there is a state token representing the remote reference).
>> >> >>>>>>>
>> >> >>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve the large iterable use case but also opens up the ability for types which don't need to be fully decoded to provide lazy views. Imagine our Beam rows using a format where only rows that are read are decoded while everything else is left in its encoded form.
>> >> >>>>>>>
>> >> >>>>>>> I also originally thought that this could also help solve an issue where large values[3] need to be chunked across multiple protobuf messages over the Data API which complicates the reading side decoding implementation since each SDK needs to provide an implementation that blocks and waits for the next chunk to come across for the same logical stream[4]. But there are issues with this because the runner may make a bad coder choice such as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if there are many many values.
>> >> >>
>> >> >>
>> >> >> Yes. I think this would need to be a separate coder than the length prefix coder.
>> >> >>
>> >> >>
>> >> >>>>>>>
>> >> >>>>>>> Robert, would implementing the length prefix coder being backed by state + adding a lazy decoding method to the iterable coder be significantly more complicated then what you are proposing right now?
>> >> >>
>> >> >>
>> >> >> Yes, chopping things up at arbitrary byte boundaries (rather than element boundaries) tends to be significantly more subtle and complex (based on my experience with the data plane API). It would also require new public APIs for Coders.
>> >> >
>> >> >
>> >> > After some further thought, I don't think we need to have a different API for coders, its just that they get a different implementation for the inputstream when decoding. So the logic would be:
>> >> > public T decode(InputStream is) {
>> >> >   if (is instanceof SeekableInputStream) {
>> >> >     return view((SeekableInputStream) is);
>> >> >   }
>> >> >   return decodeInternal(is);
>> >> > }
>> >>
>> >> SeekableInputStream is a new API. If we went this route of re-using
>> >> decode, it'd be an easy bug to accidentally pass a SeekableInputStream
>> >> to component coders which wouldn't do the right thing. (Perhaps all
>> >> coders would have to be modified?) And encoding is less obvious (e.g.
>> >> a subclass of OutputStream that takes a callback for the rest of the
>> >> bytes? As chosen by the caller or the callee?).
>> >>
>> >> >> This is why I went with the more restricted (but still by far most common, and quite straightforward) case of supporting arbitrarily large iterables (which can still occur at any level of nesting, e.g. inside rows), leaving the general case as future work.
>> >> >>
>> >> >>
>> >> >>>>>>>
>> >> >>>>>>> What do others think about coders supporting a "lazy" decode mode in coders?
>> >> >>>>>>>
>> >> >>>>>>> 1: https://github.com/apache/beam/pull/7127
>> >> >>>>>>> 2: https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>> >> >>>>>>> 3: https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>> >> >>>>>>> 4: https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>> >> >
>> >> > --
>> >> > You received this message because you are subscribed to the Google Groups "DataPLS Unified Worker" group.
>> >> > To unsubscribe from this group and stop receiving emails from it, send an email to datapls-unified-worker+unsubscribe@google.com.
>> >> > To post to this group, send email to datapls-unified-worker@google.com.
>> >> > To view this discussion on the web visit https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com.

Re: Handling large values

Posted by Lukasz Cwik <lc...@google.com>.
On Thu, Nov 29, 2018 at 7:13 AM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Nov 29, 2018 at 2:18 AM Lukasz Cwik <lc...@google.com> wrote:
> >
> > I don't believe we would need to change any other coders since
> SeekableInputStream wouldn't change how a regular InputStream would work so
> coders that don't care about the implementation would still use it as a
> forward only input stream. Coders that care about seeking would use the new
> functionality.
>
> An API could be developed that makes this work, but the proposal of
>
> class SmartCoder<T> {
>   public T decode(InputStream is) {
>     if (is instanceof SeekableInputStream) {
>       return view((SeekableInputStream) is);
>     }
>     return decodeInternal(is);
>   }
> }
>
> would break it passed to (just as an example) the unmodified KV coder
>
> class KvCoder<K, V> {
>   public Kv<K, V> decode(InputStream is) {
>     return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
>   }
> }
>
> when invoked with an InputStream that happens to be a
> SeekableInputStream if either keyCoder or valueCoder were instances of
> SmartCoder unless SmartCoder.view() did something really clever about
> advancing the provided stream the right amount without actually
> consuming it. This is particularly expensive/tricky for iterables
> where it's most useful.
>

Thanks for walking through this with me Robert.

The issue is that the view needs to advance the stream if it wants to
decode the components separately, this works naturally for the iterable
coder since all decoding is done in order so that advances the stream
automatically and for any component coder where it also supports being a
view. For any coder that isn't advancing the stream in order has to have an
index as part of its encoding. Using the KV coder as the example, the two
strategies would be as follows:

decode method is the same for both strategies
public KV<K, V> decode(InputStream is) {
  if (is instanceof SeekableInputStream) {
    return KVView((SeekableInputStream) is, keyCoder, valueCoder);
  }
  return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
}

forward only view decoding:
class KVView<K, V> extends KV<K, V> {
  K getKey() {
    if (!keyDecoded) {
      key = keyCoder.decode(is);
    }
    return key;
  }

  V getValue() {
    // ensures the input stream has advanced to the value position
    getKey();

    if (!valueDecoded) {
     value = valueCoder.decode(is);
    }
    return value;
}

index based decoding:
class KVView<K, V> extends KV<K, V> {
  KVView(SeekableInputStream is, Coder<K> keyCoder, Coder<V> valueCoder) {
    valueOffset = readBigEndianInt(is);
    // ...
  }
  K getKey() {
    if (!keyDecoded) {
      is.seek(4);  // 4 bytes for big int index
      key = keyCoder.decode(is);
    }
    return key;
  }

  V getValue() {
    if (!valueDecoded) {
     is.seek(valueOffset);
     value = valueCoder.decode(is);
    }
    return value;
}

I believe for the KV case and the iterable case we will find that our
coders are typically KV<LengthPrefix<Key>, LengthPrefix<Value>> and
Iterable<LengthPrefix<Value>> which would mean that a smart coder could
inspect the component coder and if its a length prefix coder, ask it to
seek to the end of its value within the input stream which mean that a
smart coder could understand the length of its components.


> > For the encoding portion, the state backed length prefix coder would
> send the small snippet of data that it received plus the state key without
> invoking the component coder to encode the value. The downstream receiving
> party would need to lookup the remote reference to get all the data.
>
> I'm trying to follow what you're saying here. Are you restricting to
> the case of only encoding something that was formerly decoded with a
> state backed length prefix coder (and keeps the unencoded bytes
> around)?


Yes.


> It'd be good to support writing novel values lazily as well.
> Also, this brings up the issue of how to manage the lifetime of remote
> references if they can be extended in this way.
>

I don't know of any data processing system that currently handles this and
am unsure if this is a problem in practice for many people. If someone gets
a giant string iterable and naively transforms it, for example by
concatenating all the strings, their worker will crash. Only solution I
have seen is that people don't naively transform the value but instead pass
around a view over the value that applies the transform as needed.
Extending remote references to also be able to be created by SDKs is an
interesting idea that I had explored a tiny bit in the past but then
dropped it due to time constraints and for the fact that we had much more
immediate things we wanted to get implemented.


> > All other coders would not be lazy and would have to encode the entire
> lazy view, this could be done by optimized by copying the
> SeekableInputStream to the OutputStream. Note that the length prefix coder
> is never used with IOs and hence those IOs could be given a type like
> Iterable<Foo> which is lazy, but the encoding for that wouldn't be lazy
>
> Yes, that's how it is now.
>
> > and would output all the data from the SeekableInputStream.
> >
> >
> > On Wed, Nov 28, 2018 at 3:08 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <lc...@google.com> wrote:
> >> >
> >> > Re-adding +datapls-portability-team@google.com +
> datapls-unified-worker@google.com
> >> >
> >> > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>
> >> >> Thanks for bringing this to the list. More below.
> >> >>
> >> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >> >>>
> >> >>> FWIW I deliberately limited the thread to not mix public and
> private lists, so people intending private replies do not accidentally send
> to dev@beam.
> >> >>>
> >> >>> I've left them on this time, to avoid contradicting your action,
> but I recommend removing them.
> >> >>>
> >> >>> Kenn
> >> >>>
> >> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >>>>
> >> >>>> Re-adding +datapls-portability-team@google.com +
> datapls-unified-worker@google.com
> >> >>>>
> >> >>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >>>>>
> >> >>>>> That is correct Kenn. An important point would be that
> SomeOtherCoder would be given a seekable stream (instead of the forward
> only stream it gets right now) so it can either decode all the data or
> lazily decode parts as it needs to as in the case of an iterable coder when
> used to support large iterables coming out of a GroupByKey.
> >> >>>>>
> >> >>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >> >>>>>>
> >> >>>>>> Interesting! Having large iterables within rows would be great
> for the interactions between SQL and the core SDK's schema/Row support, and
> we weren't sure how that could work, exactly.
> >> >>>>>>
> >> >>>>>> My (very basic) understanding would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
> followed by the encoding of SomeOtherCoder.
> >> >>>>>>
> >> >>>>>> So the new proposal would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a length
> followed by some number of bytes and if it ends with a special token
> (ignoring escaping issues) then you have to gather bytes from more messages
> in order to assemble a stream to send to SomeOtherCoder? Have I got what
> you mean? So this is a different, yet compatible, approach to sending over
> a special token that has to be looked up separately via the state read API?
> >> >>>>>>
> >> >>>>>> Kenn
> >> >>>>>>
> >> >>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >>>>>>>
> >> >>>>>>> There is a discussion happening on a PR 7127[1] where Robert is
> working on providing the first implementation for supporting large
> iterables resulting from a GroupByKey. This is inline with the original
> proposal for remote references over the Fn Data & State API[2].
> >> >>>>>>>
> >> >>>>>>> I had thought about this issue more since the original write up
> was done over a year ago and believe that we can simplify the
> implementation by migrating the length prefix coder to be able to embed a
> remote reference token at the end of the stream if the data is too large.
> This allows any coder which supports lazy decoding to return a view over a
> seekable stream instead of decoding all the data (regardless whether all
> the data was sent or there is a state token representing the remote
> reference).
> >> >>>>>>>
> >> >>>>>>> Allowing any arbitrary coder to support lazy decoding helps
> solve the large iterable use case but also opens up the ability for types
> which don't need to be fully decoded to provide lazy views. Imagine our
> Beam rows using a format where only rows that are read are decoded while
> everything else is left in its encoded form.
> >> >>>>>>>
> >> >>>>>>> I also originally thought that this could also help solve an
> issue where large values[3] need to be chunked across multiple protobuf
> messages over the Data API which complicates the reading side decoding
> implementation since each SDK needs to provide an implementation that
> blocks and waits for the next chunk to come across for the same logical
> stream[4]. But there are issues with this because the runner may make a bad
> coder choice such as iterable<length_prefix<blob>> (instead of
> length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if
> there are many many values.
> >> >>
> >> >>
> >> >> Yes. I think this would need to be a separate coder than the length
> prefix coder.
> >> >>
> >> >>
> >> >>>>>>>
> >> >>>>>>> Robert, would implementing the length prefix coder being backed
> by state + adding a lazy decoding method to the iterable coder be
> significantly more complicated then what you are proposing right now?
> >> >>
> >> >>
> >> >> Yes, chopping things up at arbitrary byte boundaries (rather than
> element boundaries) tends to be significantly more subtle and complex
> (based on my experience with the data plane API). It would also require new
> public APIs for Coders.
> >> >
> >> >
> >> > After some further thought, I don't think we need to have a different
> API for coders, its just that they get a different implementation for the
> inputstream when decoding. So the logic would be:
> >> > public T decode(InputStream is) {
> >> >   if (is instanceof SeekableInputStream) {
> >> >     return view((SeekableInputStream) is);
> >> >   }
> >> >   return decodeInternal(is);
> >> > }
> >>
> >> SeekableInputStream is a new API. If we went this route of re-using
> >> decode, it'd be an easy bug to accidentally pass a SeekableInputStream
> >> to component coders which wouldn't do the right thing. (Perhaps all
> >> coders would have to be modified?) And encoding is less obvious (e.g.
> >> a subclass of OutputStream that takes a callback for the rest of the
> >> bytes? As chosen by the caller or the callee?).
> >>
> >> >> This is why I went with the more restricted (but still by far most
> common, and quite straightforward) case of supporting arbitrarily large
> iterables (which can still occur at any level of nesting, e.g. inside
> rows), leaving the general case as future work.
> >> >>
> >> >>
> >> >>>>>>>
> >> >>>>>>> What do others think about coders supporting a "lazy" decode
> mode in coders?
> >> >>>>>>>
> >> >>>>>>> 1: https://github.com/apache/beam/pull/7127
> >> >>>>>>> 2:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
> >> >>>>>>> 3:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
> >> >>>>>>> 4:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
> >> >
> >> > --
> >> > You received this message because you are subscribed to the Google
> Groups "DataPLS Unified Worker" group.
> >> > To unsubscribe from this group and stop receiving emails from it,
> send an email to datapls-unified-worker+unsubscribe@google.com.
> >> > To post to this group, send email to
> datapls-unified-worker@google.com.
> >> > To view this discussion on the web visit
> https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com
> .
>

Re: Handling large values

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Nov 29, 2018 at 2:18 AM Lukasz Cwik <lc...@google.com> wrote:
>
> I don't believe we would need to change any other coders since SeekableInputStream wouldn't change how a regular InputStream would work so coders that don't care about the implementation would still use it as a forward only input stream. Coders that care about seeking would use the new functionality.

An API could be developed that makes this work, but the proposal of

class SmartCoder<T> {
  public T decode(InputStream is) {
    if (is instanceof SeekableInputStream) {
      return view((SeekableInputStream) is);
    }
    return decodeInternal(is);
  }
}

would break it passed to (just as an example) the unmodified KV coder

class KvCoder<K, V> {
  public Kv<K, V> decode(InputStream is) {
    return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
  }
}

when invoked with an InputStream that happens to be a
SeekableInputStream if either keyCoder or valueCoder were instances of
SmartCoder unless SmartCoder.view() did something really clever about
advancing the provided stream the right amount without actually
consuming it. This is particularly expensive/tricky for iterables
where it's most useful.

> For the encoding portion, the state backed length prefix coder would send the small snippet of data that it received plus the state key without invoking the component coder to encode the value. The downstream receiving party would need to lookup the remote reference to get all the data.

I'm trying to follow what you're saying here. Are you restricting to
the case of only encoding something that was formerly decoded with a
state backed length prefix coder (and keeps the unencoded bytes
around)? It'd be good to support writing novel values lazily as well.
Also, this brings up the issue of how to manage the lifetime of remote
references if they can be extended in this way.

> All other coders would not be lazy and would have to encode the entire lazy view, this could be done by optimized by copying the SeekableInputStream to the OutputStream. Note that the length prefix coder is never used with IOs and hence those IOs could be given a type like Iterable<Foo> which is lazy, but the encoding for that wouldn't be lazy

Yes, that's how it is now.

> and would output all the data from the SeekableInputStream.
>
>
> On Wed, Nov 28, 2018 at 3:08 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <lc...@google.com> wrote:
>> >
>> > Re-adding +datapls-portability-team@google.com +datapls-unified-worker@google.com
>> >
>> > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <ro...@google.com> wrote:
>> >>
>> >> Thanks for bringing this to the list. More below.
>> >>
>> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <ke...@apache.org> wrote:
>> >>>
>> >>> FWIW I deliberately limited the thread to not mix public and private lists, so people intending private replies do not accidentally send to dev@beam.
>> >>>
>> >>> I've left them on this time, to avoid contradicting your action, but I recommend removing them.
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:
>> >>>>
>> >>>> Re-adding +datapls-portability-team@google.com +datapls-unified-worker@google.com
>> >>>>
>> >>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote:
>> >>>>>
>> >>>>> That is correct Kenn. An important point would be that SomeOtherCoder would be given a seekable stream (instead of the forward only stream it gets right now) so it can either decode all the data or lazily decode parts as it needs to as in the case of an iterable coder when used to support large iterables coming out of a GroupByKey.
>> >>>>>
>> >>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org> wrote:
>> >>>>>>
>> >>>>>> Interesting! Having large iterables within rows would be great for the interactions between SQL and the core SDK's schema/Row support, and we weren't sure how that could work, exactly.
>> >>>>>>
>> >>>>>> My (very basic) understanding would be that LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length followed by the encoding of SomeOtherCoder.
>> >>>>>>
>> >>>>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a length followed by some number of bytes and if it ends with a special token (ignoring escaping issues) then you have to gather bytes from more messages in order to assemble a stream to send to SomeOtherCoder? Have I got what you mean? So this is a different, yet compatible, approach to sending over a special token that has to be looked up separately via the state read API?
>> >>>>>>
>> >>>>>> Kenn
>> >>>>>>
>> >>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:
>> >>>>>>>
>> >>>>>>> There is a discussion happening on a PR 7127[1] where Robert is working on providing the first implementation for supporting large iterables resulting from a GroupByKey. This is inline with the original proposal for remote references over the Fn Data & State API[2].
>> >>>>>>>
>> >>>>>>> I had thought about this issue more since the original write up was done over a year ago and believe that we can simplify the implementation by migrating the length prefix coder to be able to embed a remote reference token at the end of the stream if the data is too large. This allows any coder which supports lazy decoding to return a view over a seekable stream instead of decoding all the data (regardless whether all the data was sent or there is a state token representing the remote reference).
>> >>>>>>>
>> >>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve the large iterable use case but also opens up the ability for types which don't need to be fully decoded to provide lazy views. Imagine our Beam rows using a format where only rows that are read are decoded while everything else is left in its encoded form.
>> >>>>>>>
>> >>>>>>> I also originally thought that this could also help solve an issue where large values[3] need to be chunked across multiple protobuf messages over the Data API which complicates the reading side decoding implementation since each SDK needs to provide an implementation that blocks and waits for the next chunk to come across for the same logical stream[4]. But there are issues with this because the runner may make a bad coder choice such as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if there are many many values.
>> >>
>> >>
>> >> Yes. I think this would need to be a separate coder than the length prefix coder.
>> >>
>> >>
>> >>>>>>>
>> >>>>>>> Robert, would implementing the length prefix coder being backed by state + adding a lazy decoding method to the iterable coder be significantly more complicated then what you are proposing right now?
>> >>
>> >>
>> >> Yes, chopping things up at arbitrary byte boundaries (rather than element boundaries) tends to be significantly more subtle and complex (based on my experience with the data plane API). It would also require new public APIs for Coders.
>> >
>> >
>> > After some further thought, I don't think we need to have a different API for coders, its just that they get a different implementation for the inputstream when decoding. So the logic would be:
>> > public T decode(InputStream is) {
>> >   if (is instanceof SeekableInputStream) {
>> >     return view((SeekableInputStream) is);
>> >   }
>> >   return decodeInternal(is);
>> > }
>>
>> SeekableInputStream is a new API. If we went this route of re-using
>> decode, it'd be an easy bug to accidentally pass a SeekableInputStream
>> to component coders which wouldn't do the right thing. (Perhaps all
>> coders would have to be modified?) And encoding is less obvious (e.g.
>> a subclass of OutputStream that takes a callback for the rest of the
>> bytes? As chosen by the caller or the callee?).
>>
>> >> This is why I went with the more restricted (but still by far most common, and quite straightforward) case of supporting arbitrarily large iterables (which can still occur at any level of nesting, e.g. inside rows), leaving the general case as future work.
>> >>
>> >>
>> >>>>>>>
>> >>>>>>> What do others think about coders supporting a "lazy" decode mode in coders?
>> >>>>>>>
>> >>>>>>> 1: https://github.com/apache/beam/pull/7127
>> >>>>>>> 2: https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>> >>>>>>> 3: https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>> >>>>>>> 4: https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>> >
>> > --
>> > You received this message because you are subscribed to the Google Groups "DataPLS Unified Worker" group.
>> > To unsubscribe from this group and stop receiving emails from it, send an email to datapls-unified-worker+unsubscribe@google.com.
>> > To post to this group, send email to datapls-unified-worker@google.com.
>> > To view this discussion on the web visit https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com.

Re: Handling large values

Posted by Lukasz Cwik <lc...@google.com>.
I don't believe we would need to change any other coders since
SeekableInputStream wouldn't change how a regular InputStream would work so
coders that don't care about the implementation would still use it as a
forward only input stream. Coders that care about seeking would use the new
functionality.

For the encoding portion, the state backed length prefix coder would send
the small snippet of data that it received plus the state key without
invoking the component coder to encode the value. The downstream receiving
party would need to lookup the remote reference to get all the data. All
other coders would not be lazy and would have to encode the entire lazy
view, this could be done by optimized by copying the SeekableInputStream to
the OutputStream. Note that the length prefix coder is never used with IOs
and hence those IOs could be given a type like Iterable<Foo> which is lazy,
but the encoding for that wouldn't be lazy and would output all the data
from the SeekableInputStream.


On Wed, Nov 28, 2018 at 3:08 PM Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <lc...@google.com> wrote:
> >
> > Re-adding +datapls-portability-team@google.com +
> datapls-unified-worker@google.com
> >
> > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Thanks for bringing this to the list. More below.
> >>
> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>>
> >>> FWIW I deliberately limited the thread to not mix public and private
> lists, so people intending private replies do not accidentally send to
> dev@beam.
> >>>
> >>> I've left them on this time, to avoid contradicting your action, but I
> recommend removing them.
> >>>
> >>> Kenn
> >>>
> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:
> >>>>
> >>>> Re-adding +datapls-portability-team@google.com +
> datapls-unified-worker@google.com
> >>>>
> >>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >>>>>
> >>>>> That is correct Kenn. An important point would be that
> SomeOtherCoder would be given a seekable stream (instead of the forward
> only stream it gets right now) so it can either decode all the data or
> lazily decode parts as it needs to as in the case of an iterable coder when
> used to support large iterables coming out of a GroupByKey.
> >>>>>
> >>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>>>>>
> >>>>>> Interesting! Having large iterables within rows would be great for
> the interactions between SQL and the core SDK's schema/Row support, and we
> weren't sure how that could work, exactly.
> >>>>>>
> >>>>>> My (very basic) understanding would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
> followed by the encoding of SomeOtherCoder.
> >>>>>>
> >>>>>> So the new proposal would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a length
> followed by some number of bytes and if it ends with a special token
> (ignoring escaping issues) then you have to gather bytes from more messages
> in order to assemble a stream to send to SomeOtherCoder? Have I got what
> you mean? So this is a different, yet compatible, approach to sending over
> a special token that has to be looked up separately via the state read API?
> >>>>>>
> >>>>>> Kenn
> >>>>>>
> >>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >>>>>>>
> >>>>>>> There is a discussion happening on a PR 7127[1] where Robert is
> working on providing the first implementation for supporting large
> iterables resulting from a GroupByKey. This is inline with the original
> proposal for remote references over the Fn Data & State API[2].
> >>>>>>>
> >>>>>>> I had thought about this issue more since the original write up
> was done over a year ago and believe that we can simplify the
> implementation by migrating the length prefix coder to be able to embed a
> remote reference token at the end of the stream if the data is too large.
> This allows any coder which supports lazy decoding to return a view over a
> seekable stream instead of decoding all the data (regardless whether all
> the data was sent or there is a state token representing the remote
> reference).
> >>>>>>>
> >>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve
> the large iterable use case but also opens up the ability for types which
> don't need to be fully decoded to provide lazy views. Imagine our Beam rows
> using a format where only rows that are read are decoded while everything
> else is left in its encoded form.
> >>>>>>>
> >>>>>>> I also originally thought that this could also help solve an issue
> where large values[3] need to be chunked across multiple protobuf messages
> over the Data API which complicates the reading side decoding
> implementation since each SDK needs to provide an implementation that
> blocks and waits for the next chunk to come across for the same logical
> stream[4]. But there are issues with this because the runner may make a bad
> coder choice such as iterable<length_prefix<blob>> (instead of
> length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if
> there are many many values.
> >>
> >>
> >> Yes. I think this would need to be a separate coder than the length
> prefix coder.
> >>
> >>
> >>>>>>>
> >>>>>>> Robert, would implementing the length prefix coder being backed by
> state + adding a lazy decoding method to the iterable coder be
> significantly more complicated then what you are proposing right now?
> >>
> >>
> >> Yes, chopping things up at arbitrary byte boundaries (rather than
> element boundaries) tends to be significantly more subtle and complex
> (based on my experience with the data plane API). It would also require new
> public APIs for Coders.
> >
> >
> > After some further thought, I don't think we need to have a different
> API for coders, its just that they get a different implementation for the
> inputstream when decoding. So the logic would be:
> > public T decode(InputStream is) {
> >   if (is instanceof SeekableInputStream) {
> >     return view((SeekableInputStream) is);
> >   }
> >   return decodeInternal(is);
> > }
>
> SeekableInputStream is a new API. If we went this route of re-using
> decode, it'd be an easy bug to accidentally pass a SeekableInputStream
> to component coders which wouldn't do the right thing. (Perhaps all
> coders would have to be modified?) And encoding is less obvious (e.g.
> a subclass of OutputStream that takes a callback for the rest of the
> bytes? As chosen by the caller or the callee?).
>
> >> This is why I went with the more restricted (but still by far most
> common, and quite straightforward) case of supporting arbitrarily large
> iterables (which can still occur at any level of nesting, e.g. inside
> rows), leaving the general case as future work.
> >>
> >>
> >>>>>>>
> >>>>>>> What do others think about coders supporting a "lazy" decode mode
> in coders?
> >>>>>>>
> >>>>>>> 1: https://github.com/apache/beam/pull/7127
> >>>>>>> 2:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
> >>>>>>> 3:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
> >>>>>>> 4:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
> >
> > --
> > You received this message because you are subscribed to the Google
> Groups "DataPLS Unified Worker" group.
> > To unsubscribe from this group and stop receiving emails from it, send
> an email to datapls-unified-worker+unsubscribe@google.com.
> > To post to this group, send email to datapls-unified-worker@google.com.
> > To view this discussion on the web visit
> https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com
> .
>

Re: Handling large values

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <lc...@google.com> wrote:
>
> Re-adding +datapls-portability-team@google.com +datapls-unified-worker@google.com
>
> On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Thanks for bringing this to the list. More below.
>>
>> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>> FWIW I deliberately limited the thread to not mix public and private lists, so people intending private replies do not accidentally send to dev@beam.
>>>
>>> I've left them on this time, to avoid contradicting your action, but I recommend removing them.
>>>
>>> Kenn
>>>
>>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>> Re-adding +datapls-portability-team@google.com +datapls-unified-worker@google.com
>>>>
>>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>> That is correct Kenn. An important point would be that SomeOtherCoder would be given a seekable stream (instead of the forward only stream it gets right now) so it can either decode all the data or lazily decode parts as it needs to as in the case of an iterable coder when used to support large iterables coming out of a GroupByKey.
>>>>>
>>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>
>>>>>> Interesting! Having large iterables within rows would be great for the interactions between SQL and the core SDK's schema/Row support, and we weren't sure how that could work, exactly.
>>>>>>
>>>>>> My (very basic) understanding would be that LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length followed by the encoding of SomeOtherCoder.
>>>>>>
>>>>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a length followed by some number of bytes and if it ends with a special token (ignoring escaping issues) then you have to gather bytes from more messages in order to assemble a stream to send to SomeOtherCoder? Have I got what you mean? So this is a different, yet compatible, approach to sending over a special token that has to be looked up separately via the state read API?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>> There is a discussion happening on a PR 7127[1] where Robert is working on providing the first implementation for supporting large iterables resulting from a GroupByKey. This is inline with the original proposal for remote references over the Fn Data & State API[2].
>>>>>>>
>>>>>>> I had thought about this issue more since the original write up was done over a year ago and believe that we can simplify the implementation by migrating the length prefix coder to be able to embed a remote reference token at the end of the stream if the data is too large. This allows any coder which supports lazy decoding to return a view over a seekable stream instead of decoding all the data (regardless whether all the data was sent or there is a state token representing the remote reference).
>>>>>>>
>>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve the large iterable use case but also opens up the ability for types which don't need to be fully decoded to provide lazy views. Imagine our Beam rows using a format where only rows that are read are decoded while everything else is left in its encoded form.
>>>>>>>
>>>>>>> I also originally thought that this could also help solve an issue where large values[3] need to be chunked across multiple protobuf messages over the Data API which complicates the reading side decoding implementation since each SDK needs to provide an implementation that blocks and waits for the next chunk to come across for the same logical stream[4]. But there are issues with this because the runner may make a bad coder choice such as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if there are many many values.
>>
>>
>> Yes. I think this would need to be a separate coder than the length prefix coder.
>>
>>
>>>>>>>
>>>>>>> Robert, would implementing the length prefix coder being backed by state + adding a lazy decoding method to the iterable coder be significantly more complicated then what you are proposing right now?
>>
>>
>> Yes, chopping things up at arbitrary byte boundaries (rather than element boundaries) tends to be significantly more subtle and complex (based on my experience with the data plane API). It would also require new public APIs for Coders.
>
>
> After some further thought, I don't think we need to have a different API for coders, its just that they get a different implementation for the inputstream when decoding. So the logic would be:
> public T decode(InputStream is) {
>   if (is instanceof SeekableInputStream) {
>     return view((SeekableInputStream) is);
>   }
>   return decodeInternal(is);
> }

SeekableInputStream is a new API. If we went this route of re-using
decode, it'd be an easy bug to accidentally pass a SeekableInputStream
to component coders which wouldn't do the right thing. (Perhaps all
coders would have to be modified?) And encoding is less obvious (e.g.
a subclass of OutputStream that takes a callback for the rest of the
bytes? As chosen by the caller or the callee?).

>> This is why I went with the more restricted (but still by far most common, and quite straightforward) case of supporting arbitrarily large iterables (which can still occur at any level of nesting, e.g. inside rows), leaving the general case as future work.
>>
>>
>>>>>>>
>>>>>>> What do others think about coders supporting a "lazy" decode mode in coders?
>>>>>>>
>>>>>>> 1: https://github.com/apache/beam/pull/7127
>>>>>>> 2: https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>>>>>>> 3: https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>>>>>>> 4: https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>
> --
> You received this message because you are subscribed to the Google Groups "DataPLS Unified Worker" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to datapls-unified-worker+unsubscribe@google.com.
> To post to this group, send email to datapls-unified-worker@google.com.
> To view this discussion on the web visit https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com.

Re: Handling large values

Posted by Lukasz Cwik <lc...@google.com>.
Re-adding +datapls-portability-team@google.com
<da...@google.com> +datapls-unified-worker@google.com
<da...@google.com>

On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <ro...@google.com> wrote:

> Thanks for bringing this to the list. More below.
>
> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> FWIW I deliberately limited the thread to not mix public and private
>> lists, so people intending private replies do not accidentally send to
>> dev@beam.
>>
>> I've left them on this time, to avoid contradicting your action, but I
>> recommend removing them.
>>
>> Kenn
>>
>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Re-adding +datapls-portability-team@google.com
>>> <da...@google.com> +datapls-unified-worker@google.com
>>> <da...@google.com>
>>>
>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> That is correct Kenn. An important point would be that SomeOtherCoder
>>>> would be given a seekable stream (instead of the forward only stream it
>>>> gets right now) so it can either decode all the data or lazily decode parts
>>>> as it needs to as in the case of an iterable coder when used to support
>>>> large iterables coming out of a GroupByKey.
>>>>
>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> Interesting! Having large iterables within rows would be great for the
>>>>> interactions between SQL and the core SDK's schema/Row support, and we
>>>>> weren't sure how that could work, exactly.
>>>>>
>>>>> My (very basic) understanding would be that
>>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
>>>>> followed by the encoding of SomeOtherCoder.
>>>>>
>>>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder)
>>>>> has an encoding where it has a length followed by some number of bytes and
>>>>> if it ends with a special token (ignoring escaping issues) then you have to
>>>>> gather bytes from more messages in order to assemble a stream to send to
>>>>> SomeOtherCoder? Have I got what you mean? So this is a different, yet
>>>>> compatible, approach to sending over a special token that has to be looked
>>>>> up separately via the state read API?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> There is a discussion happening on a PR 7127[1] where Robert is
>>>>>> working on providing the first implementation for supporting large
>>>>>> iterables resulting from a GroupByKey. This is inline with the original
>>>>>> proposal for remote references over the Fn Data & State API[2].
>>>>>>
>>>>>> I had thought about this issue more since the original write up was
>>>>>> done over a year ago and believe that we can simplify the implementation by
>>>>>> migrating the length prefix coder to be able to embed a remote reference
>>>>>> token at the end of the stream if the data is too large. This allows any
>>>>>> coder which supports lazy decoding to return a view over a seekable stream
>>>>>> instead of decoding all the data (regardless whether all the data was sent
>>>>>> or there is a state token representing the remote reference).
>>>>>>
>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve the
>>>>>> large iterable use case but also opens up the ability for types which don't
>>>>>> need to be fully decoded to provide lazy views. Imagine our Beam rows using
>>>>>> a format where only rows that are read are decoded while everything else is
>>>>>> left in its encoded form.
>>>>>>
>>>>>> I also originally thought that this could also help solve an issue
>>>>>> where large values[3] need to be chunked across multiple protobuf messages
>>>>>> over the Data API which complicates the reading side decoding
>>>>>> implementation since each SDK needs to provide an implementation that
>>>>>> blocks and waits for the next chunk to come across for the same logical
>>>>>> stream[4]. But there are issues with this because the runner may make a bad
>>>>>> coder choice such as iterable<length_prefix<blob>> (instead
>>>>>> of length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if
>>>>>> there are many many values.
>>>>>>
>>>>>
> Yes. I think this would need to be a separate coder than the length prefix
> coder.
>

>
Robert, would implementing the length prefix coder being backed by state +
>>>>>> adding a lazy decoding method to the iterable coder be significantly more
>>>>>> complicated then what you are proposing right now?
>>>>>>
>>>>>
> Yes, chopping things up at arbitrary byte boundaries (rather than element
> boundaries) tends to be significantly more subtle and complex (based on my
> experience with the data plane API). It would also require new public APIs
> for Coders.
>

After some further thought, I don't think we need to have a different API
for coders, its just that they get a different implementation for the
inputstream when decoding. So the logic would be:
public T decode(InputStream is) {
  if (is instanceof SeekableInputStream) {
    return view((SeekableInputStream) is);
  }
  return decodeInternal(is);
}

This is why I went with the more restricted (but still by far most common,
> and quite straightforward) case of supporting arbitrarily large iterables
> (which can still occur at any level of nesting, e.g. inside rows), leaving
> the general case as future work.
>

>
>> What do others think about coders supporting a "lazy" decode mode in
>>>>>> coders?
>>>>>>
>>>>>> 1: https://github.com/apache/beam/pull/7127
>>>>>> 2:
>>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>>>>>> 3:
>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>>>>>> 4:
>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>>>>>>
>>>>>

Re: Handling large values

Posted by Robert Bradshaw <ro...@google.com>.
Thanks for bringing this to the list. More below.

On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <ke...@apache.org> wrote:

> FWIW I deliberately limited the thread to not mix public and private
> lists, so people intending private replies do not accidentally send to
> dev@beam.
>
> I've left them on this time, to avoid contradicting your action, but I
> recommend removing them.
>
> Kenn
>
> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Re-adding +datapls-portability-team@google.com
>> <da...@google.com> +datapls-unified-worker@google.com
>> <da...@google.com>
>>
>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> That is correct Kenn. An important point would be that SomeOtherCoder
>>> would be given a seekable stream (instead of the forward only stream it
>>> gets right now) so it can either decode all the data or lazily decode parts
>>> as it needs to as in the case of an iterable coder when used to support
>>> large iterables coming out of a GroupByKey.
>>>
>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org>
>>> wrote:
>>>
>>>> Interesting! Having large iterables within rows would be great for the
>>>> interactions between SQL and the core SDK's schema/Row support, and we
>>>> weren't sure how that could work, exactly.
>>>>
>>>> My (very basic) understanding would be that
>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
>>>> followed by the encoding of SomeOtherCoder.
>>>>
>>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder)
>>>> has an encoding where it has a length followed by some number of bytes and
>>>> if it ends with a special token (ignoring escaping issues) then you have to
>>>> gather bytes from more messages in order to assemble a stream to send to
>>>> SomeOtherCoder? Have I got what you mean? So this is a different, yet
>>>> compatible, approach to sending over a special token that has to be looked
>>>> up separately via the state read API?
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> There is a discussion happening on a PR 7127[1] where Robert is
>>>>> working on providing the first implementation for supporting large
>>>>> iterables resulting from a GroupByKey. This is inline with the original
>>>>> proposal for remote references over the Fn Data & State API[2].
>>>>>
>>>>> I had thought about this issue more since the original write up was
>>>>> done over a year ago and believe that we can simplify the implementation by
>>>>> migrating the length prefix coder to be able to embed a remote reference
>>>>> token at the end of the stream if the data is too large. This allows any
>>>>> coder which supports lazy decoding to return a view over a seekable stream
>>>>> instead of decoding all the data (regardless whether all the data was sent
>>>>> or there is a state token representing the remote reference).
>>>>>
>>>>> Allowing any arbitrary coder to support lazy decoding helps solve the
>>>>> large iterable use case but also opens up the ability for types which don't
>>>>> need to be fully decoded to provide lazy views. Imagine our Beam rows using
>>>>> a format where only rows that are read are decoded while everything else is
>>>>> left in its encoded form.
>>>>>
>>>>> I also originally thought that this could also help solve an issue
>>>>> where large values[3] need to be chunked across multiple protobuf messages
>>>>> over the Data API which complicates the reading side decoding
>>>>> implementation since each SDK needs to provide an implementation that
>>>>> blocks and waits for the next chunk to come across for the same logical
>>>>> stream[4]. But there are issues with this because the runner may make a bad
>>>>> coder choice such as iterable<length_prefix<blob>> (instead
>>>>> of length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if
>>>>> there are many many values.
>>>>>
>>>>
Yes. I think this would need to be a separate coder than the length prefix
coder.


> Robert, would implementing the length prefix coder being backed by state +
>>>>> adding a lazy decoding method to the iterable coder be significantly more
>>>>> complicated then what you are proposing right now?
>>>>>
>>>>
Yes, chopping things up at arbitrary byte boundaries (rather than element
boundaries) tends to be significantly more subtle and complex (based on my
experience with the data plane API). It would also require new public APIs
for Coders.

This is why I went with the more restricted (but still by far most common,
and quite straightforward) case of supporting arbitrarily large iterables
(which can still occur at any level of nesting, e.g. inside rows), leaving
the general case as future work.


> What do others think about coders supporting a "lazy" decode mode in
>>>>> coders?
>>>>>
>>>>> 1: https://github.com/apache/beam/pull/7127
>>>>> 2:
>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>>>>> 3:
>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>>>>> 4:
>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>>>>>
>>>>

Re: Handling large values

Posted by Kenneth Knowles <ke...@apache.org>.
FWIW I deliberately limited the thread to not mix public and private lists,
so people intending private replies do not accidentally send to dev@beam.

I've left them on this time, to avoid contradicting your action, but I
recommend removing them.

Kenn

On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:

> Re-adding +datapls-portability-team@google.com
> <da...@google.com> +datapls-unified-worker@google.com
> <da...@google.com>
>
> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> That is correct Kenn. An important point would be that SomeOtherCoder
>> would be given a seekable stream (instead of the forward only stream it
>> gets right now) so it can either decode all the data or lazily decode parts
>> as it needs to as in the case of an iterable coder when used to support
>> large iterables coming out of a GroupByKey.
>>
>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Interesting! Having large iterables within rows would be great for the
>>> interactions between SQL and the core SDK's schema/Row support, and we
>>> weren't sure how that could work, exactly.
>>>
>>> My (very basic) understanding would be that
>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
>>> followed by the encoding of SomeOtherCoder.
>>>
>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder)
>>> has an encoding where it has a length followed by some number of bytes and
>>> if it ends with a special token (ignoring escaping issues) then you have to
>>> gather bytes from more messages in order to assemble a stream to send to
>>> SomeOtherCoder? Have I got what you mean? So this is a different, yet
>>> compatible, approach to sending over a special token that has to be looked
>>> up separately via the state read API?
>>>
>>> Kenn
>>>
>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> There is a discussion happening on a PR 7127[1] where Robert is working
>>>> on providing the first implementation for supporting large iterables
>>>> resulting from a GroupByKey. This is inline with the original proposal for
>>>> remote references over the Fn Data & State API[2].
>>>>
>>>> I had thought about this issue more since the original write up was
>>>> done over a year ago and believe that we can simplify the implementation by
>>>> migrating the length prefix coder to be able to embed a remote reference
>>>> token at the end of the stream if the data is too large. This allows any
>>>> coder which supports lazy decoding to return a view over a seekable stream
>>>> instead of decoding all the data (regardless whether all the data was sent
>>>> or there is a state token representing the remote reference).
>>>>
>>>> Allowing any arbitrary coder to support lazy decoding helps solve the
>>>> large iterable use case but also opens up the ability for types which don't
>>>> need to be fully decoded to provide lazy views. Imagine our Beam rows using
>>>> a format where only rows that are read are decoded while everything else is
>>>> left in its encoded form.
>>>>
>>>> I also originally thought that this could also help solve an issue
>>>> where large values[3] need to be chunked across multiple protobuf messages
>>>> over the Data API which complicates the reading side decoding
>>>> implementation since each SDK needs to provide an implementation that
>>>> blocks and waits for the next chunk to come across for the same logical
>>>> stream[4]. But there are issues with this because the runner may make a bad
>>>> coder choice such as iterable<length_prefix<blob>> (instead
>>>> of length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if
>>>> there are many many values.
>>>>
>>>> Robert, would implementing the length prefix coder being backed by
>>>> state + adding a lazy decoding method to the iterable coder be
>>>> significantly more complicated then what you are proposing right now?
>>>>
>>>> What do others think about coders supporting a "lazy" decode mode in
>>>> coders?
>>>>
>>>> 1: https://github.com/apache/beam/pull/7127
>>>> 2:
>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>>>> 3:
>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>>>> 4:
>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>>>>
>>>

Re: Handling large values

Posted by Lukasz Cwik <lc...@google.com>.
Re-adding +datapls-portability-team@google.com
<da...@google.com> +datapls-unified-worker@google.com
<da...@google.com>

On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote:

> That is correct Kenn. An important point would be that SomeOtherCoder
> would be given a seekable stream (instead of the forward only stream it
> gets right now) so it can either decode all the data or lazily decode parts
> as it needs to as in the case of an iterable coder when used to support
> large iterables coming out of a GroupByKey.
>
> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Interesting! Having large iterables within rows would be great for the
>> interactions between SQL and the core SDK's schema/Row support, and we
>> weren't sure how that could work, exactly.
>>
>> My (very basic) understanding would be that
>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
>> followed by the encoding of SomeOtherCoder.
>>
>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has
>> an encoding where it has a length followed by some number of bytes and if
>> it ends with a special token (ignoring escaping issues) then you have to
>> gather bytes from more messages in order to assemble a stream to send to
>> SomeOtherCoder? Have I got what you mean? So this is a different, yet
>> compatible, approach to sending over a special token that has to be looked
>> up separately via the state read API?
>>
>> Kenn
>>
>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> There is a discussion happening on a PR 7127[1] where Robert is working
>>> on providing the first implementation for supporting large iterables
>>> resulting from a GroupByKey. This is inline with the original proposal for
>>> remote references over the Fn Data & State API[2].
>>>
>>> I had thought about this issue more since the original write up was done
>>> over a year ago and believe that we can simplify the implementation by
>>> migrating the length prefix coder to be able to embed a remote reference
>>> token at the end of the stream if the data is too large. This allows any
>>> coder which supports lazy decoding to return a view over a seekable stream
>>> instead of decoding all the data (regardless whether all the data was sent
>>> or there is a state token representing the remote reference).
>>>
>>> Allowing any arbitrary coder to support lazy decoding helps solve the
>>> large iterable use case but also opens up the ability for types which don't
>>> need to be fully decoded to provide lazy views. Imagine our Beam rows using
>>> a format where only rows that are read are decoded while everything else is
>>> left in its encoded form.
>>>
>>> I also originally thought that this could also help solve an issue where
>>> large values[3] need to be chunked across multiple protobuf messages over
>>> the Data API which complicates the reading side decoding implementation
>>> since each SDK needs to provide an implementation that blocks and waits for
>>> the next chunk to come across for the same logical stream[4]. But there are
>>> issues with this because the runner may make a bad coder choice such
>>> as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>)
>>> which can lead to > 2gb of state keys if there are many many values.
>>>
>>> Robert, would implementing the length prefix coder being backed by
>>> state + adding a lazy decoding method to the iterable coder be
>>> significantly more complicated then what you are proposing right now?
>>>
>>> What do others think about coders supporting a "lazy" decode mode in
>>> coders?
>>>
>>> 1: https://github.com/apache/beam/pull/7127
>>> 2:
>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>>> 3:
>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>>> 4:
>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>>>
>>

Re: Handling large values

Posted by Lukasz Cwik <lc...@google.com>.
That is correct Kenn. An important point would be that SomeOtherCoder would
be given a seekable stream (instead of the forward only stream it gets
right now) so it can either decode all the data or lazily decode parts as
it needs to as in the case of an iterable coder when used to support large
iterables coming out of a GroupByKey.

On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <ke...@apache.org> wrote:

> Interesting! Having large iterables within rows would be great for the
> interactions between SQL and the core SDK's schema/Row support, and we
> weren't sure how that could work, exactly.
>
> My (very basic) understanding would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
> followed by the encoding of SomeOtherCoder.
>
> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has
> an encoding where it has a length followed by some number of bytes and if
> it ends with a special token (ignoring escaping issues) then you have to
> gather bytes from more messages in order to assemble a stream to send to
> SomeOtherCoder? Have I got what you mean? So this is a different, yet
> compatible, approach to sending over a special token that has to be looked
> up separately via the state read API?
>
> Kenn
>
> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> There is a discussion happening on a PR 7127[1] where Robert is working
>> on providing the first implementation for supporting large iterables
>> resulting from a GroupByKey. This is inline with the original proposal for
>> remote references over the Fn Data & State API[2].
>>
>> I had thought about this issue more since the original write up was done
>> over a year ago and believe that we can simplify the implementation by
>> migrating the length prefix coder to be able to embed a remote reference
>> token at the end of the stream if the data is too large. This allows any
>> coder which supports lazy decoding to return a view over a seekable stream
>> instead of decoding all the data (regardless whether all the data was sent
>> or there is a state token representing the remote reference).
>>
>> Allowing any arbitrary coder to support lazy decoding helps solve the
>> large iterable use case but also opens up the ability for types which don't
>> need to be fully decoded to provide lazy views. Imagine our Beam rows using
>> a format where only rows that are read are decoded while everything else is
>> left in its encoded form.
>>
>> I also originally thought that this could also help solve an issue where
>> large values[3] need to be chunked across multiple protobuf messages over
>> the Data API which complicates the reading side decoding implementation
>> since each SDK needs to provide an implementation that blocks and waits for
>> the next chunk to come across for the same logical stream[4]. But there are
>> issues with this because the runner may make a bad coder choice such
>> as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>)
>> which can lead to > 2gb of state keys if there are many many values.
>>
>> Robert, would implementing the length prefix coder being backed by
>> state + adding a lazy decoding method to the iterable coder be
>> significantly more complicated then what you are proposing right now?
>>
>> What do others think about coders supporting a "lazy" decode mode in
>> coders?
>>
>> 1: https://github.com/apache/beam/pull/7127
>> 2:
>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>> 3:
>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>> 4:
>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>>
>

Re: Handling large values

Posted by Kenneth Knowles <ke...@apache.org>.
Interesting! Having large iterables within rows would be great for the
interactions between SQL and the core SDK's schema/Row support, and we
weren't sure how that could work, exactly.

My (very basic) understanding would be that
LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
followed by the encoding of SomeOtherCoder.

So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has
an encoding where it has a length followed by some number of bytes and if
it ends with a special token (ignoring escaping issues) then you have to
gather bytes from more messages in order to assemble a stream to send to
SomeOtherCoder? Have I got what you mean? So this is a different, yet
compatible, approach to sending over a special token that has to be looked
up separately via the state read API?

Kenn

On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:

> There is a discussion happening on a PR 7127[1] where Robert is working on
> providing the first implementation for supporting large iterables resulting
> from a GroupByKey. This is inline with the original proposal for remote
> references over the Fn Data & State API[2].
>
> I had thought about this issue more since the original write up was done
> over a year ago and believe that we can simplify the implementation by
> migrating the length prefix coder to be able to embed a remote reference
> token at the end of the stream if the data is too large. This allows any
> coder which supports lazy decoding to return a view over a seekable stream
> instead of decoding all the data (regardless whether all the data was sent
> or there is a state token representing the remote reference).
>
> Allowing any arbitrary coder to support lazy decoding helps solve the
> large iterable use case but also opens up the ability for types which don't
> need to be fully decoded to provide lazy views. Imagine our Beam rows using
> a format where only rows that are read are decoded while everything else is
> left in its encoded form.
>
> I also originally thought that this could also help solve an issue where
> large values[3] need to be chunked across multiple protobuf messages over
> the Data API which complicates the reading side decoding implementation
> since each SDK needs to provide an implementation that blocks and waits for
> the next chunk to come across for the same logical stream[4]. But there are
> issues with this because the runner may make a bad coder choice such
> as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>)
> which can lead to > 2gb of state keys if there are many many values.
>
> Robert, would implementing the length prefix coder being backed by state +
> adding a lazy decoding method to the iterable coder be significantly more
> complicated then what you are proposing right now?
>
> What do others think about coders supporting a "lazy" decode mode in
> coders?
>
> 1: https://github.com/apache/beam/pull/7127
> 2:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
> 3:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
> 4:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>