You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Dawid Wysakowicz <dw...@apache.org> on 2020/09/01 06:49:06 UTC

[DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Hi devs,

As described in the FLIP-131[1] we intend to deprecate and remove the
DataSet API in the future in favour of the DataStream API for both
bounded/batch and unbounded/streaming jobs. Ideally, we should be able
to stay in the same performance ballpark with bounded DataStream
programs as equivalent DataSet programs.

One of the ideas to do so is to introduce a sorting before keyed
operators and replace the StateBackend with a simplified one. In other
words you could see that as a switch from a hash based aggregations with
quite costly StateBackends (RocksDB) vs sort-based aggregations with
aggregations purely in memory. You can see more details in the FLIP-140 [2]

The FLIP contains some open questions that I'd really appreciate an
input from the community. Some of the questions include:

 1. How to sort/group keys? What representation of the key should we
    use? Should we sort on the binary form or should we depend on
    Comparators being available.
 2. Where in the stack should we apply the sorting (this rather a
    discussion about internals)
 3. How should we deal with custom implementations of StreamOperators

I am really looking forward to all your feedback!

Best,

Dawid

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741&src=contextnavpagetreemode

[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+bounded+style+execution+for+keyed+streams


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Dawid Wysakowicz <dw...@apache.org>.
That's for sure. I am not claiming against it. What I am saying is that
we don't necessarily need a true "sorting" in this particular use case.
We only need to cluster records with the same keys together. We don't
need the keys to be logically sorted. What I am saying is that for
clustering the keys a binary order is enough. I agree this would not
work if we we were to implement an operation such as DataStream#sort.

Best,

Dawid

On 09/09/2020 08:22, Kurt Young wrote:
> I doubt that any sorting algorithm would work with only knowing the
> keys are different but without
> information of which is greater. 
>  
> Best,
> Kurt
>
>
> On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz
> <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>
>     Ad. 1
>
>     Yes, you are right in principle.
>
>     Let me though clarify my proposal a bit. The proposed sort-style
>     execution aims at a generic KeyedProcessFunction were all the
>     "aggregations" are actually performed in the user code. It tries to
>     improve the performance by actually removing the need to use
>     RocksDB e.g.:
>
>         private static final class Summer<K>
>                 extends KeyedProcessFunction<K, Tuple2<K, Integer>,
>     Tuple2<K, Integer>> {
>
>             ....
>
>             @Override
>             public void processElement(
>                     Tuple2<K, Integer> value,
>                     Context ctx,
>                     Collector<Tuple2<K, Integer>> out) throws Exception {
>                 if (!Objects.equals(timerRegistered.value(),
>     Boolean.TRUE)) {
>                    
>     ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>                     timerRegistered.update(true);
>                 }
>                 Integer v = counter.value();
>                 Integer incomingValue = value.f1;
>                 if (v != null) {
>                     v += incomingValue;
>                 } else {
>                     v = incomingValue;
>                 }
>                 counter.update(v);
>             }
>
>             ....
>
>        }
>
>     Therefore I don't think the first part of your reply with
>     separating the
>     write and read workload applies here. We do not aim to create a
>     competing API with the Table API. We think operations such as joins or
>     analytical aggregations should be performed in Table API.
>
>     As for the second part I agree it would be nice to fall back to the
>     sorting approach only if a certain threshold of memory in a State
>     Backend is used. This has some problems though. We would need a way to
>     estimate the size of the occupied memory to tell when the threshold is
>     reached. That is not easily doable by default e.g. in a
>     MemoryStateBackend, as we do not serialize the values in the state
>     backend by default. We would have to add that, but this would add the
>     overhead of the serialization.
>
>     This proposal aims at the cases where we do have a large state
>     that will
>     not fit into the memory and without the change users are forced to use
>     RocksDB. If the state fits in memory I agree it will be better to do
>     hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
>     think it is important to give users the choice to use one or the other
>     approach. We might discuss which approach should be the default for
>     RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
>     user configured state backend or sorting-based with a single key at a
>     time backend. Moreover we could think if we should let users
>     choose the
>     sort vs hash "state backend" per operator. Would that suffice?
>
>     Ad. 2
>
>     I still think we can just use the first X bytes of the serialized form
>     as the normalized key and fallback to comparing full keys on
>     clashes. It
>     is because we are actually not interested in a logical order, but we
>     care only about the "grouping" aspect of the sorting. Therefore I
>     think
>     its enough to compare only parts of the full key as the normalized
>     key.
>
>     Thanks again for the really nice and thorough feedback!
>
>     Best,
>
>     Dawid
>
>     On 08/09/2020 14:47, Kurt Young wrote:
>     > Regarding #1, yes the state backend is definitely hash-based
>     execution.
>     > However there are some differences between
>     > batch hash-based execution. The key difference is *random access &
>     > read/write mixed workload". For example, by using
>     > state backend in streaming execution, one have to mix the read
>     and write
>     > operations and all of them are actually random
>     > access. But in a batch hash execution, we could divide the
>     phases into
>     > write and read. For example, we can build the
>     > hash table first, with only write operations. And once the build
>     is done,
>     > we can start to read and trigger the user codes.
>     > Take hash aggregation which blink planner implemented as an
>     example, during
>     > building phase, as long as the hash map
>     > could fit into memory, we will update the accumulators directly
>     in the hash
>     > map. And once we are running out of memory,
>     > we then fall back to sort based execution. It improves the
>     performance a
>     > lot if the incoming data can be processed in
>     > memory.
>     >
>     > Regarding #2, IIUC you are actually describing a binary format
>     of key, not
>     > normalized key which is used in DataSet. I will
>     > take String for example. If we have lots of keys with length all
>     greater
>     > than, let's say 20. In your proposal, you will encode
>     > the whole string in the prefix of your composed data ( <key> +
>     <timestamp>
>     > + <record> ). And when you compare
>     > records, you will actually compare the *whole* key of the
>     record. For
>     > normalized key, it's fixed-length in this case, IIRC it will
>     > take 8 bytes to represent the string. And the sorter will store the
>     > normalized key and offset in a dedicated array. When doing
>     > the sorting, it only sorts this *small* array. If the normalized
>     keys are
>     > different, you could immediately tell which is greater from
>     > normalized keys. You only have to compare the full keys if the
>     normalized
>     > keys are equal and you know in this case the normalized
>     > key couldn't represent the full key. The reason why Dataset is
>     doing this
>     > is it's super cache efficient by sorting the *small* array.
>     > The idea is borrowed from this paper [1]. Let me know if I missed or
>     > misunderstood anything.
>     >
>     > [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
>     > cache-sensitive parallel external sort)
>     >
>     > Best,
>     > Kurt
>     >
>     >
>     > On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz
>     <dwysakowicz@apache.org <ma...@apache.org>>
>     > wrote:
>     >
>     >> Hey Kurt,
>     >>
>     >> Thank you for comments!
>     >>
>     >> Ad. 1 I might have missed something here, but as far as I see
>     it is that
>     >> using the current execution stack with regular state backends
>     (RocksDB
>     >> in particular if we want to have spilling capabilities) is
>     equivalent to
>     >> hash-based execution. I can see a different spilling state backend
>     >> implementation in the future, but I think it is not batch
>     specifc. Or am
>     >> I missing something?
>     >>
>     >> Ad. 2 Totally agree that normalized keys are important to the
>     >> performance. I think though TypeComparators are not a necessity
>     to have
>     >> that. Actually  this proposal is heading towards only ever
>     performing
>     >> "normalized keys" comparison. I have not included in the
>     proposal the
>     >> binary format which we will use for sorting (partially because
>     I forgot,
>     >> and partially because I thought it was too much of an
>     implementation
>     >> detail). Let me include it here though, as it might clear the
>     situation
>     >> a bit here.
>     >>
>     >> In DataSet, at times we have KeySelectors which extract keys
>     based on
>     >> field indices or names. This allows in certain situation to
>     extract the
>     >> key from serialized records. Compared to DataSet, in
>     DataStream, the key
>     >> is always described with a black-box KeySelector, or
>     differently with a
>     >> function which extracts a key from a deserialized record.  In
>     turn there
>     >> is no way to create a comparator that could compare records by
>     >> extracting the key from a serialized record (neither with, nor
>     without
>     >> key normalization). We suggest that the input for the sorter
>     will be
>     >>
>     >> <key> + <timestamp> + <record>
>     >>
>     >> Without having the key prepended we would have to deserialize
>     the record
>     >> for every key comparison.
>     >>
>     >> Therefore if we agree that we perform binary comparison for
>     keys (which
>     >> are always prepended), it is actually equivalent to a DataSet with
>     >> TypeComparators that support key normalization.
>     >>
>     >> Let me know if that is clear, or I have missed something here.
>     >>
>     >> Best,
>     >>
>     >> Dawid
>     >>
>     >> On 08/09/2020 03:39, Kurt Young wrote:
>     >>> Hi Dawid, thanks for bringing this up, it's really exciting to
>     see that
>     >>> batch execution is introduced in DataStream. From the flip, it
>     seems
>     >>> we are sticking with sort based execution mode (at least for
>     now), which
>     >>> will sort the whole input data before any *keyed* operation is
>     >>> executed. I have two comments here:
>     >>>
>     >>> 1. Do we want to introduce hash-based execution in the future?
>     Sort is a
>     >>> safe choice but not the best in lots of cases. IIUC we only need
>     >>> to make sure that before the framework finishes dealing with
>     one key, the
>     >>> operator doesn't see any data belonging to other keys, thus
>     >>> hash-based execution would also do the trick. Oon tricky thing the
>     >>> framework might need to deal with is memory constraint and
>     spilling
>     >>> in the hash map, but Flink also has some good knowledge about
>     these
>     >> stuff.
>     >>> 2. Going back to sort-based execution and how to sort keys.
>     From my
>     >>> experience, the performance of sorting would be one the most
>     important
>     >>> things if we want to achieve good performance of batch
>     execution. And
>     >>> normalized keys are actually the key of the performance of
>     sorting.
>     >>> If we want to get rid of TypeComparator, I think we still need
>     to find a
>     >>> way to introduce this back.
>     >>>
>     >>> Best,
>     >>> Kurt
>     >>>
>     >>>
>     >>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek
>     <aljoscha@apache.org <ma...@apache.org>>
>     >> wrote:
>     >>>> Yes, I think we can address the problem of indeterminacy in a
>     separate
>     >>>> FLIP because we're already in it.
>     >>>>
>     >>>> Aljoscha
>     >>>>
>     >>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>     >>>>> @Seth That's a very good point. I agree that RocksDB has the
>     same
>     >>>>> problem. I think we can use the same approach for the sorted
>     shuffles
>     >>>>> then. @Aljoscha I agree we should think about making it more
>     resilient,
>     >>>>> as I guess users might have problems already if they use
>     keys with
>     >>>>> non-deterministic binary representation. How do you feel about
>     >>>>> addressing that separately purely to limit the scope of this
>     FLIP?
>     >>>>>
>     >>>>> @Aljoscha I tend to agree with you that the best place to
>     actually
>     >> place
>     >>>>> the sorting would be in the InputProcessor(s). If there are
>     no more
>     >>>>> suggestions in respect to that issue. I'll put this proposal for
>     >> voting.
>     >>>>> @all Thank you for the feedback so far. I'd like to start a
>     voting
>     >>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
>     >> comment
>     >>>>> before that, if you still have some outstanding ideas.
>     >>>>>
>     >>>>> Best,
>     >>>>>
>     >>>>> Dawid
>     >>>>>
>     >>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>     >>>>>> Seth is right, I was just about to write that as well.
>     There is a
>     >>>>>> problem, though, because some of our TypeSerializers are not
>     >>>>>> deterministic even though we use them as if they were. Beam
>     excludes
>     >>>>>> the FloatCoder, for example, and the AvroCoder in certain
>     cases. I'm
>     >>>>>> pretty sure there is also weirdness going on in our
>     KryoSerializer.
>     >>>>>>
>     >>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>     >>>>>>> There is already an implicit assumption the TypeSerializer
>     for keys
>     >> is
>     >>>>>>> stable/deterministic, RocksDB compares keys using their
>     serialized
>     >> byte
>     >>>>>>> strings. I think this is a non-issue (or at least it's not
>     changing
>     >> the
>     >>>>>>> status quo).
>     >>>>>>>
>     >>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther
>     <twalthr@apache.org <ma...@apache.org>>
>     >>>> wrote:
>     >>>>>>>> +1 for getting rid of the TypeComparator interface and
>     rely on the
>     >>>>>>>> serialized representation for grouping.
>     >>>>>>>>
>     >>>>>>>> Adding a new type to DataStream API is quite difficult at
>     the moment
>     >>>>>>>> due
>     >>>>>>>> to too many components that are required: TypeInformation
>     (tries to
>     >>>>>>>> deal
>     >>>>>>>> with logical fields for TypeComparators), TypeSerializer
>     (incl. it's
>     >>>>>>>> snapshot interfaces), and TypeComparator (with many
>     methods and
>     >>>>>>>> internals such normalized keys etc.).
>     >>>>>>>>
>     >>>>>>>> If necessary, we can add more simple comparison-related
>     methods to
>     >> the
>     >>>>>>>> TypeSerializer interface itself in the future (like
>     >>>>>>>> TypeSerializer.isDeterministic).
>     >>>>>>>>
>     >>>>>>>> Regards,
>     >>>>>>>> Timo
>     >>>>>>>>
>     >>>>>>>>
>     >>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>     >>>>>>>>> Thanks for publishing the FLIP!
>     >>>>>>>>>
>     >>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz
>     <dwysakowicz@apache.org <ma...@apache.org>>
>     >>>>>>>>> wrote:
>     >>>>>>>>>>     1. How to sort/group keys? What representation of
>     the key
>     >>>>>>>>>> should we
>     >>>>>>>>>>        use? Should we sort on the binary form or should
>     we depend
>     >> on
>     >>>>>>>>>>        Comparators being available.
>     >>>>>>>>> Initially, I suggested to Dawid (in private) to do the
>     >>>>>>>>> sorting/grouping
>     >>>>>>>> by using the binary representation. Then my opinion
>     switched and I
>     >>>>>>>> thought
>     >>>>>>>> we should use TypeComparator/Comparator because that's
>     what the
>     >>>>>>>> DataSet API
>     >>>>>>>> uses. After talking to Stephan, I'm again encouraged in
>     my opinion
>     >>>>>>>> to use
>     >>>>>>>> the binary representation because it means we can
>     eventually get rid
>     >>>>>>>> of the
>     >>>>>>>> TypeComparator interface, which is a bit complicated, and
>     because we
>     >>>>>>>> don't
>     >>>>>>>> need any good order in our sort, we only need the grouping.
>     >>>>>>>>> This comes with some problems, though: we need to ensure
>     that the
>     >>>>>>>> TypeSerializer of the type we're sorting is
>     stable/deterministic.
>     >>>>>>>> Beam has
>     >>>>>>>> infrastructure for this in the form of
>     Coder.verifyDeterministic()
>     >> [1]
>     >>>>>>>> which we don't have right now and should add if we go
>     down this
>     >> path.
>     >>>>>>>>>>     2. Where in the stack should we apply the sorting (this
>     >> rather a
>     >>>>>>>>>>        discussion about internals)
>     >>>>>>>>> Here, I'm gravitating towards the third option of
>     implementing it
>     >>>>>>>>> in the
>     >>>>>>>> layer of the StreamTask, which probably means
>     implementing a custom
>     >>>>>>>> InputProcessor. I think it's best to do it in this layer
>     because we
>     >>>>>>>> would
>     >>>>>>>> not mix concerns of different layers as we would if we
>     implemented
>     >>>>>>>> this as
>     >>>>>>>> a custom StreamOperator. I think this solution is also
>     best when it
>     >>>>>>>> comes
>     >>>>>>>> to multi-input operators.
>     >>>>>>>>>>     3. How should we deal with custom implementations of
>     >>>>>>>>>> StreamOperators
>     >>>>>>>>> I think the cleanest solution would be to go through the
>     complete
>     >>>>>>>> operator lifecycle for every key, because then the
>     watermark would
>     >> not
>     >>>>>>>> oscillate between -Inf and +Inf and we would not break the
>     >> semantical
>     >>>>>>>> guarantees that we gave to operators so far, in that the
>     watermark
>     >> is
>     >>>>>>>> strictly monotonically increasing. However, I don't think
>     this
>     >>>>>>>> solution is
>     >>>>>>>> feasible because it would come with too much overhead. We
>     should
>     >>>>>>>> solve this
>     >>>>>>>> problem via documentation and maybe educate people to not
>     query the
>     >>>>>>>> current
>     >>>>>>>> watermark or not rely on the watermark being monotonically
>     >>>>>>>> increasing in
>     >>>>>>>> operator implementations to allow the framework more
>     freedoms in how
>     >>>>>>>> user
>     >>>>>>>> programs are executed.
>     >>>>>>>>> Aljoscha
>     >>>>>>>>>
>     >>>>>>>>> [1]
>     >>
>     https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>     >>
>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Kurt Young <yk...@gmail.com>.
 I doubt that any sorting algorithm would work with only knowing the keys
are different but without
information of which is greater.

Best,
Kurt


On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Ad. 1
>
> Yes, you are right in principle.
>
> Let me though clarify my proposal a bit. The proposed sort-style
> execution aims at a generic KeyedProcessFunction were all the
> "aggregations" are actually performed in the user code. It tries to
> improve the performance by actually removing the need to use RocksDB e.g.:
>
>     private static final class Summer<K>
>             extends KeyedProcessFunction<K, Tuple2<K, Integer>,
> Tuple2<K, Integer>> {
>
>         ....
>
>         @Override
>         public void processElement(
>                 Tuple2<K, Integer> value,
>                 Context ctx,
>                 Collector<Tuple2<K, Integer>> out) throws Exception {
>             if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) {
>                 ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>                 timerRegistered.update(true);
>             }
>             Integer v = counter.value();
>             Integer incomingValue = value.f1;
>             if (v != null) {
>                 v += incomingValue;
>             } else {
>                 v = incomingValue;
>             }
>             counter.update(v);
>         }
>
>         ....
>
>    }
>
> Therefore I don't think the first part of your reply with separating the
> write and read workload applies here. We do not aim to create a
> competing API with the Table API. We think operations such as joins or
> analytical aggregations should be performed in Table API.
>
> As for the second part I agree it would be nice to fall back to the
> sorting approach only if a certain threshold of memory in a State
> Backend is used. This has some problems though. We would need a way to
> estimate the size of the occupied memory to tell when the threshold is
> reached. That is not easily doable by default e.g. in a
> MemoryStateBackend, as we do not serialize the values in the state
> backend by default. We would have to add that, but this would add the
> overhead of the serialization.
>
> This proposal aims at the cases where we do have a large state that will
> not fit into the memory and without the change users are forced to use
> RocksDB. If the state fits in memory I agree it will be better to do
> hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> think it is important to give users the choice to use one or the other
> approach. We might discuss which approach should be the default for
> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> user configured state backend or sorting-based with a single key at a
> time backend. Moreover we could think if we should let users choose the
> sort vs hash "state backend" per operator. Would that suffice?
>
> Ad. 2
>
> I still think we can just use the first X bytes of the serialized form
> as the normalized key and fallback to comparing full keys on clashes. It
> is because we are actually not interested in a logical order, but we
> care only about the "grouping" aspect of the sorting. Therefore I think
> its enough to compare only parts of the full key as the normalized key.
>
> Thanks again for the really nice and thorough feedback!
>
> Best,
>
> Dawid
>
> On 08/09/2020 14:47, Kurt Young wrote:
> > Regarding #1, yes the state backend is definitely hash-based execution.
> > However there are some differences between
> > batch hash-based execution. The key difference is *random access &
> > read/write mixed workload". For example, by using
> > state backend in streaming execution, one have to mix the read and write
> > operations and all of them are actually random
> > access. But in a batch hash execution, we could divide the phases into
> > write and read. For example, we can build the
> > hash table first, with only write operations. And once the build is done,
> > we can start to read and trigger the user codes.
> > Take hash aggregation which blink planner implemented as an example,
> during
> > building phase, as long as the hash map
> > could fit into memory, we will update the accumulators directly in the
> hash
> > map. And once we are running out of memory,
> > we then fall back to sort based execution. It improves the performance a
> > lot if the incoming data can be processed in
> > memory.
> >
> > Regarding #2, IIUC you are actually describing a binary format of key,
> not
> > normalized key which is used in DataSet. I will
> > take String for example. If we have lots of keys with length all greater
> > than, let's say 20. In your proposal, you will encode
> > the whole string in the prefix of your composed data ( <key> +
> <timestamp>
> > + <record> ). And when you compare
> > records, you will actually compare the *whole* key of the record. For
> > normalized key, it's fixed-length in this case, IIRC it will
> > take 8 bytes to represent the string. And the sorter will store the
> > normalized key and offset in a dedicated array. When doing
> > the sorting, it only sorts this *small* array. If the normalized keys are
> > different, you could immediately tell which is greater from
> > normalized keys. You only have to compare the full keys if the normalized
> > keys are equal and you know in this case the normalized
> > key couldn't represent the full key. The reason why Dataset is doing this
> > is it's super cache efficient by sorting the *small* array.
> > The idea is borrowed from this paper [1]. Let me know if I missed or
> > misunderstood anything.
> >
> > [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
> > cache-sensitive parallel external sort)
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <dw...@apache.org>
> > wrote:
> >
> >> Hey Kurt,
> >>
> >> Thank you for comments!
> >>
> >> Ad. 1 I might have missed something here, but as far as I see it is that
> >> using the current execution stack with regular state backends (RocksDB
> >> in particular if we want to have spilling capabilities) is equivalent to
> >> hash-based execution. I can see a different spilling state backend
> >> implementation in the future, but I think it is not batch specifc. Or am
> >> I missing something?
> >>
> >> Ad. 2 Totally agree that normalized keys are important to the
> >> performance. I think though TypeComparators are not a necessity to have
> >> that. Actually  this proposal is heading towards only ever performing
> >> "normalized keys" comparison. I have not included in the proposal the
> >> binary format which we will use for sorting (partially because I forgot,
> >> and partially because I thought it was too much of an implementation
> >> detail). Let me include it here though, as it might clear the situation
> >> a bit here.
> >>
> >> In DataSet, at times we have KeySelectors which extract keys based on
> >> field indices or names. This allows in certain situation to extract the
> >> key from serialized records. Compared to DataSet, in DataStream, the key
> >> is always described with a black-box KeySelector, or differently with a
> >> function which extracts a key from a deserialized record.  In turn there
> >> is no way to create a comparator that could compare records by
> >> extracting the key from a serialized record (neither with, nor without
> >> key normalization). We suggest that the input for the sorter will be
> >>
> >> <key> + <timestamp> + <record>
> >>
> >> Without having the key prepended we would have to deserialize the record
> >> for every key comparison.
> >>
> >> Therefore if we agree that we perform binary comparison for keys (which
> >> are always prepended), it is actually equivalent to a DataSet with
> >> TypeComparators that support key normalization.
> >>
> >> Let me know if that is clear, or I have missed something here.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 08/09/2020 03:39, Kurt Young wrote:
> >>> Hi Dawid, thanks for bringing this up, it's really exciting to see that
> >>> batch execution is introduced in DataStream. From the flip, it seems
> >>> we are sticking with sort based execution mode (at least for now),
> which
> >>> will sort the whole input data before any *keyed* operation is
> >>> executed. I have two comments here:
> >>>
> >>> 1. Do we want to introduce hash-based execution in the future? Sort is
> a
> >>> safe choice but not the best in lots of cases. IIUC we only need
> >>> to make sure that before the framework finishes dealing with one key,
> the
> >>> operator doesn't see any data belonging to other keys, thus
> >>> hash-based execution would also do the trick. Oon tricky thing the
> >>> framework might need to deal with is memory constraint and spilling
> >>> in the hash map, but Flink also has some good knowledge about these
> >> stuff.
> >>> 2. Going back to sort-based execution and how to sort keys. From my
> >>> experience, the performance of sorting would be one the most important
> >>> things if we want to achieve good performance of batch execution. And
> >>> normalized keys are actually the key of the performance of sorting.
> >>> If we want to get rid of TypeComparator, I think we still need to find
> a
> >>> way to introduce this back.
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <al...@apache.org>
> >> wrote:
> >>>> Yes, I think we can address the problem of indeterminacy in a separate
> >>>> FLIP because we're already in it.
> >>>>
> >>>> Aljoscha
> >>>>
> >>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
> >>>>> @Seth That's a very good point. I agree that RocksDB has the same
> >>>>> problem. I think we can use the same approach for the sorted shuffles
> >>>>> then. @Aljoscha I agree we should think about making it more
> resilient,
> >>>>> as I guess users might have problems already if they use keys with
> >>>>> non-deterministic binary representation. How do you feel about
> >>>>> addressing that separately purely to limit the scope of this FLIP?
> >>>>>
> >>>>> @Aljoscha I tend to agree with you that the best place to actually
> >> place
> >>>>> the sorting would be in the InputProcessor(s). If there are no more
> >>>>> suggestions in respect to that issue. I'll put this proposal for
> >> voting.
> >>>>> @all Thank you for the feedback so far. I'd like to start a voting
> >>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
> >> comment
> >>>>> before that, if you still have some outstanding ideas.
> >>>>>
> >>>>> Best,
> >>>>>
> >>>>> Dawid
> >>>>>
> >>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
> >>>>>> Seth is right, I was just about to write that as well. There is a
> >>>>>> problem, though, because some of our TypeSerializers are not
> >>>>>> deterministic even though we use them as if they were. Beam excludes
> >>>>>> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
> >>>>>> pretty sure there is also weirdness going on in our KryoSerializer.
> >>>>>>
> >>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
> >>>>>>> There is already an implicit assumption the TypeSerializer for keys
> >> is
> >>>>>>> stable/deterministic, RocksDB compares keys using their serialized
> >> byte
> >>>>>>> strings. I think this is a non-issue (or at least it's not changing
> >> the
> >>>>>>> status quo).
> >>>>>>>
> >>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org>
> >>>> wrote:
> >>>>>>>> +1 for getting rid of the TypeComparator interface and rely on the
> >>>>>>>> serialized representation for grouping.
> >>>>>>>>
> >>>>>>>> Adding a new type to DataStream API is quite difficult at the
> moment
> >>>>>>>> due
> >>>>>>>> to too many components that are required: TypeInformation (tries
> to
> >>>>>>>> deal
> >>>>>>>> with logical fields for TypeComparators), TypeSerializer (incl.
> it's
> >>>>>>>> snapshot interfaces), and TypeComparator (with many methods and
> >>>>>>>> internals such normalized keys etc.).
> >>>>>>>>
> >>>>>>>> If necessary, we can add more simple comparison-related methods to
> >> the
> >>>>>>>> TypeSerializer interface itself in the future (like
> >>>>>>>> TypeSerializer.isDeterministic).
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
> >>>>>>>>> Thanks for publishing the FLIP!
> >>>>>>>>>
> >>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dwysakowicz@apache.org
> >
> >>>>>>>>> wrote:
> >>>>>>>>>>     1. How to sort/group keys? What representation of the key
> >>>>>>>>>> should we
> >>>>>>>>>>        use? Should we sort on the binary form or should we
> depend
> >> on
> >>>>>>>>>>        Comparators being available.
> >>>>>>>>> Initially, I suggested to Dawid (in private) to do the
> >>>>>>>>> sorting/grouping
> >>>>>>>> by using the binary representation. Then my opinion switched and I
> >>>>>>>> thought
> >>>>>>>> we should use TypeComparator/Comparator because that's what the
> >>>>>>>> DataSet API
> >>>>>>>> uses. After talking to Stephan, I'm again encouraged in my opinion
> >>>>>>>> to use
> >>>>>>>> the binary representation because it means we can eventually get
> rid
> >>>>>>>> of the
> >>>>>>>> TypeComparator interface, which is a bit complicated, and because
> we
> >>>>>>>> don't
> >>>>>>>> need any good order in our sort, we only need the grouping.
> >>>>>>>>> This comes with some problems, though: we need to ensure that the
> >>>>>>>> TypeSerializer of the type we're sorting is stable/deterministic.
> >>>>>>>> Beam has
> >>>>>>>> infrastructure for this in the form of Coder.verifyDeterministic()
> >> [1]
> >>>>>>>> which we don't have right now and should add if we go down this
> >> path.
> >>>>>>>>>>     2. Where in the stack should we apply the sorting (this
> >> rather a
> >>>>>>>>>>        discussion about internals)
> >>>>>>>>> Here, I'm gravitating towards the third option of implementing it
> >>>>>>>>> in the
> >>>>>>>> layer of the StreamTask, which probably means implementing a
> custom
> >>>>>>>> InputProcessor. I think it's best to do it in this layer because
> we
> >>>>>>>> would
> >>>>>>>> not mix concerns of different layers as we would if we implemented
> >>>>>>>> this as
> >>>>>>>> a custom StreamOperator. I think this solution is also best when
> it
> >>>>>>>> comes
> >>>>>>>> to multi-input operators.
> >>>>>>>>>>     3. How should we deal with custom implementations of
> >>>>>>>>>> StreamOperators
> >>>>>>>>> I think the cleanest solution would be to go through the complete
> >>>>>>>> operator lifecycle for every key, because then the watermark would
> >> not
> >>>>>>>> oscillate between -Inf and +Inf and we would not break the
> >> semantical
> >>>>>>>> guarantees that we gave to operators so far, in that the watermark
> >> is
> >>>>>>>> strictly monotonically increasing. However, I don't think this
> >>>>>>>> solution is
> >>>>>>>> feasible because it would come with too much overhead. We should
> >>>>>>>> solve this
> >>>>>>>> problem via documentation and maybe educate people to not query
> the
> >>>>>>>> current
> >>>>>>>> watermark or not rely on the watermark being monotonically
> >>>>>>>> increasing in
> >>>>>>>> operator implementations to allow the framework more freedoms in
> how
> >>>>>>>> user
> >>>>>>>> programs are executed.
> >>>>>>>>> Aljoscha
> >>>>>>>>>
> >>>>>>>>> [1]
> >>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> >>
>
>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Dawid Wysakowicz <dw...@apache.org>.
> Ok, got your point now. I agree that it makes more sense to
> make StateBackend return a contract instead of a particular
> implementation. How about we name the new interface as
> `CheckpointableKeyedStateBackend`? We could make
> `BoundedStreamStateBackend` implement
> `CheckpointableKeyedStateBackend` but without checkpoint related
> operations yet, whereas reserving the possibility that the bounded
> stream also supports checkpoint in future. What do you think?
Sounds good to me. Will update the FLIP with the new name.

On 18/09/2020 15:31, Yu Li wrote:
> /bq. The problem is that I could not use this "state backend" in a
> StreamOperator./
> Ok, got your point now. I agree that it makes more sense to
> make StateBackend return a contract instead of a particular
> implementation. How about we name the new interface as
> `CheckpointableKeyedStateBackend`? We could make
> `BoundedStreamStateBackend` implement
> `CheckpointableKeyedStateBackend` but without checkpoint related
> operations yet, whereas reserving the possibility that the bounded
> stream also supports checkpoint in future. What do you think?
>
> /bq. Correct, the goal though is not to outperform the
> HeapStateBackend. The single key state backend requires sorted inputs
> which come with a price. The main goal is to outperform
> RocksDBStateBackend, which is necessary for large states./
> Personally I think the main benefit of introducing a bounded stream
> specific state backend is that we could remove the data after
> processing a key, thus reducing the cost of state storage a lot,
> rather than the routine performance of state processing (smile).
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Fri, 18 Sep 2020 at 20:48, Dawid Wysakowicz <dwysakowicz@apache.org
> <ma...@apache.org>> wrote:
>
>>     ===============================================
>>     /class BoundedStreamInternalStateBackend<K> implements
>>             KeyedStateBackend<K>,
>>             SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
>>             Closeable,
>>             CheckpointListener {/
>>     ===============================================/
>>     /
>     The problem is that I could not use this "state backend" in a
>     StreamOperator. The goal of this effort is that it is mostly
>     transparent to all the implementations of StreamOperator(s). Right
>     now StreamOperator retrieves AbstractKeyedStateBackend through
>     StreamOperatorContext which instantiates it in
>     StreamTaskInitializer etc. The problem is that a big chunk of the
>     current code base uses the AbstractKeyedStateBackend, whereas it
>     really just needs an interface not that particular implementation.
>     The change is really only about separating the contract
>     (InternalKeyedStateBackend) from the implementation
>     (AbstractKeyedStateBackend). My thinking is that it is only an
>     approach to fix a mistake of the past that StateBackend returns a
>     particular implementation rather than a contract.
>
>     I do agree I don't need the `SnapshotStrategy` and
>     `CheckpointListener` interfaces. The thing though is that the
>     runtime expects those contracts from an AbstractKeyedStateBackend.
>
>     BTW, If you'd like to see how does this change really looks like
>     you can check the PR I already opened for it:
>     https://github.com/apache/flink/pull/13405/files
>
>>     Checking the FLIP more closely I found below description: "With a
>>     high number of keys it (HeapStateBackend) suffers a significant
>>     penalty and becomes even less performant for that particular case
>>     than the sorting approach", does it mean "HeapStateBackend"
>>     outperformed "SingleKeyStateBackend" when the number of keys is
>>     relatively small
>     Correct, the goal though is not to outperform the
>     HeapStateBackend. The single key state backend requires sorted
>     inputs which come with a price. The main goal is to outperform
>     RocksDBStateBackend, which is necessary for large states.
>
>>     Thanks for the summary. I think it's more specific and could help
>>     readers to better understand why we cannot use
>>     HeapKeyedStateBackend directly, than the single line description
>>     "when the StateBackend observes a new incoming key it will reset
>>     all acquired state objects so far". What do you think?
>     Sure, I can add it to the document.
>
>     Best,
>
>     Dawid
>
>     On 18/09/2020 14:29, Yu Li wrote:
>>     Thanks for the clarification Dawid. Some of my thoughts:
>>
>>     /bq. The results are times for end-to-end execution of a job.
>>     Therefore the sorting part is included. The actual target of the
>>     replacement is RocksDB, which does the serialization and key
>>     bytes comparison as well./
>>     I see. Checking the FLIP more closely I found below description:
>>     "With a high number of keys it (HeapStateBackend) suffers a
>>     significant penalty and becomes even less performant for that
>>     particular case than the sorting approach", does it mean
>>     "HeapStateBackend" outperformed "SingleKeyStateBackend" when the
>>     number of keys is relatively small? The micro-benchmark of
>>     ValueState removes the key shuffling phase, so its result could
>>     be self-explained.
>>
>>     About `InternalKeyedStateBackend`, let me rephrase my question:
>>     why don't we add the new state backend like below instead of
>>     adding a new interface (and IMHO there's no need to implement the
>>     `SnapshotStrategy` and `CheckpointListener` interfaces since it
>>     doesn't support checkpoint)? Reserved for adding more internal
>>     state backends in future?
>>     ===============================================
>>     /class BoundedStreamInternalStateBackend<K> implements
>>             KeyedStateBackend<K>,
>>             SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
>>             Closeable,
>>             CheckpointListener {/
>>     ===============================================/
>>     /
>>
>>     /bq. Let me though quickly summarize and if you find it useful I
>>     can add it to the FLIP itself./
>>     Thanks for the summary. I think it's more specific and could help
>>     readers to better understand why we cannot use
>>     HeapKeyedStateBackend directly, than the single line description
>>     "when the StateBackend observes a new incoming key it will reset
>>     all acquired state objects so far". What do you think?
>>
>>     Thanks.
>>
>>     Best Regards,
>>     Yu
>>
>>
>>     On Thu, 17 Sep 2020 at 23:38, Dawid Wysakowicz
>>     <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>
>>         Thanks for the comments Yu.
>>
>>         > First of all, for the performance testing result, I'm
>>         wondering whether the
>>         > sorting cost is counted in the result for both DataSet and
>>         refined
>>         > DataStream implementations. I could think of the saving of
>>         hash computation
>>         > and final iteration to emit the word-count result
>>         (processing a key at a
>>         > time could save such iteration), but not sure whether these
>>         cost savings
>>         > are at the same grade of comparing the key bytes.
>>         The results are times for end-to-end execution of a job.
>>         Therefore the
>>         sorting part is included. The actual target of the replacement is
>>         RocksDB, which does the serialization and key bytes
>>         comparison as well.
>>         On top of that it adds all the RocksDB bookkeeping.
>>
>>         > However, I'm not fully convinced to introduce a new
>>         > `InternalKeyedStateBackend` interface. I agree that we
>>         don't need to take
>>         > the overhead of `AbstractKeyedStateBackend` since we don't
>>         plan to support
>>         > checkpoint for now, but why don't we directly write a state
>>         backend
>>         > implementation for bounded stream? Or are we planning to
>>         introduce more
>>         > internal state backends in future? What's more, the current
>>         design of
>>         > `InternalKeyedStateBackend` in the FLIP document seems to
>>         be extending as
>>         > many interfaces as `AbstractedKeyedStateBackend`
>>         implements, which I guess
>>         > is a typo.
>>         Maybe I was not clear enough about the change. This change
>>         does not
>>         "strip" the AbstractKeyedStateBackend of any functionalities.
>>         My intent
>>         is not to remove any methods of the
>>         AbstractKeyedStateBackend. The
>>         problem here is that the AbstractKeyedStateBackend is an
>>         abstract class
>>         (duh ;)), which does have some predefined implementation.
>>         Moreover it
>>         requires objects such as InternalKeyContex, CloseableRegistry
>>         etc. to be
>>         constructed, which we don't need/want e.g. in the single key
>>         state
>>         backend. My intention here is to make the StateBackend return
>>         only pure
>>         interfaces. (AbstractKeyedStateBackend is the only
>>         non-interface that
>>         StateBackend returns). In other words I just want to make
>>         AbstractKeyedStateBackend a proper interface. It is not a
>>         typo that
>>         InternalKeyedStateBackend extends the same interfaces as
>>         AbstractKeyedStateBackend does.
>>
>>         > Thirdly, I suggest we name the special state backend as
>>         > `BoundedStreamInternalStateBackend`. And from our existing
>>         javadoc of
>>         > `StateBackend` it actually cannot be called a complete
>>         state backend...: "A
>>         > State Backend defines how the state of a streaming
>>         application is stored
>>         > and checkpointed".
>>         Thanks for the suggestion. Sure I can use that name. Yes I do
>>         agree it
>>         is not a full fledged StateBackend. I do want it to be an
>>         internal
>>         class, that is never used explicitly by users.
>>
>>         > Lastly, I didn't find a detailed design of the
>>         "SingleKeyStateBackend" in
>>         > the FLIP,
>>         I did not put it into the design, because 1) I found it
>>         internal. It
>>         does not touch any public facing interfaces. 2) It is rather
>>         straightforward. Let me though quickly summarize and if you
>>         find it
>>         useful I can add it to the FLIP itself.
>>
>>         > as how to detect the key switching
>>         That is rather straightforwad. The state backend works only
>>         with the
>>         assumption that the keys are sorted/grouped together. We keep the
>>         current key and in the setCurrentKey we check if the new key is
>>         different then the current one. Side note: yes, custom user
>>         operators
>>         which call setCurrentKey explicitly might not work in this setup.
>>
>>         > remove the data (especially in the non-windowing
>>         > case), etc.
>>         We only ever keep a single value for a state object. Therefore
>>         ValueState is a very thin wrapper for a value, MapState for a
>>         HashMap,
>>         ListState for a List etc. When the key changes we simply set
>>         the wrapped
>>         value/map/state to null.
>>
>>         I hope this clarifies a few things. Let me know if you have
>>         any questions.
>>
>>         Best,
>>
>>         Dawid
>>
>>         On 17/09/2020 15:28, Yu Li wrote:
>>         > Hi all,
>>         >
>>         > Sorry for being late to the discussion, but I just noticed
>>         there are some
>>         > state backend related changes proposed in this FLIP, so
>>         would like to share
>>         > my two cents.
>>         >
>>         > First of all, for the performance testing result, I'm
>>         wondering whether the
>>         > sorting cost is counted in the result for both DataSet and
>>         refined
>>         > DataStream implementations. I could think of the saving of
>>         hash computation
>>         > and final iteration to emit the word-count result
>>         (processing a key at a
>>         > time could save such iteration), but not sure whether these
>>         cost savings
>>         > are at the same grade of comparing the key bytes.
>>         >
>>         > Regardless of the performance result, I agree that the
>>         capability of
>>         > removing the data after processing a key could prominently
>>         reduce the space
>>         > required by state, so introducing a new state backend for
>>         bounded stream
>>         > makes sense.
>>         >
>>         > However, I'm not fully convinced to introduce a new
>>         > `InternalKeyedStateBackend` interface. I agree that we
>>         don't need to take
>>         > the overhead of `AbstractKeyedStateBackend` since we don't
>>         plan to support
>>         > checkpoint for now, but why don't we directly write a state
>>         backend
>>         > implementation for bounded stream? Or are we planning to
>>         introduce more
>>         > internal state backends in future? What's more, the current
>>         design of
>>         > `InternalKeyedStateBackend` in the FLIP document seems to
>>         be extending as
>>         > many interfaces as `AbstractedKeyedStateBackend`
>>         implements, which I guess
>>         > is a typo.
>>         >
>>         > Thirdly, I suggest we name the special state backend as
>>         > `BoundedStreamInternalStateBackend`. And from our existing
>>         javadoc of
>>         > `StateBackend` it actually cannot be called a complete
>>         state backend...: "A
>>         > State Backend defines how the state of a streaming
>>         application is stored
>>         > and checkpointed".
>>         >
>>         > Lastly, I didn't find a detailed design of the
>>         "SingleKeyStateBackend" in
>>         > the FLIP, and suggest we write the key design down, such as
>>         how to detect
>>         > the key switching and remove the data (especially in the
>>         non-windowing
>>         > case), etc.
>>         >
>>         > Thanks.
>>         >
>>         > Best Regards,
>>         > Yu
>>         >
>>         >
>>         > On Wed, 9 Sep 2020 at 17:18, Kurt Young <ykt836@gmail.com
>>         <ma...@gmail.com>> wrote:
>>         >
>>         >> Yes, I didn't intend to block this FLIP, and some of the
>>         comments are
>>         >> actually implementation details.
>>         >> And all of them are handled internally, not visible to
>>         users, thus we can
>>         >> also change or improve them
>>         >> in the future.
>>         >>
>>         >> Best,
>>         >> Kurt
>>         >>
>>         >>
>>         >> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek
>>         <aljoscha@apache.org <ma...@apache.org>>
>>         >> wrote:
>>         >>
>>         >>> I think Kurts concerns/comments are very valid and we
>>         need to implement
>>         >>> such things in the future. However, I also think that we
>>         need to get
>>         >>> started somewhere and I think what's proposed in this
>>         FLIP is a good
>>         >>> starting point that we can build on. So we should not get
>>         paralyzed by
>>         >>> thinking too far ahead into the future. Does that make sense?
>>         >>>
>>         >>> Best,
>>         >>> Aljoscha
>>         >>>
>>         >>> On 08.09.20 16:59, Dawid Wysakowicz wrote:
>>         >>>> Ad. 1
>>         >>>>
>>         >>>> Yes, you are right in principle.
>>         >>>>
>>         >>>> Let me though clarify my proposal a bit. The proposed
>>         sort-style
>>         >>>> execution aims at a generic KeyedProcessFunction were
>>         all the
>>         >>>> "aggregations" are actually performed in the user code.
>>         It tries to
>>         >>>> improve the performance by actually removing the need to
>>         use RocksDB
>>         >>> e.g.:
>>         >>>>      private static final class Summer<K>
>>         >>>>              extends KeyedProcessFunction<K, Tuple2<K,
>>         Integer>,
>>         >>>> Tuple2<K, Integer>> {
>>         >>>>
>>         >>>>          ....
>>         >>>>
>>         >>>>          @Override
>>         >>>>          public void processElement(
>>         >>>>                  Tuple2<K, Integer> value,
>>         >>>>                  Context ctx,
>>         >>>>                  Collector<Tuple2<K, Integer>> out)
>>         throws Exception {
>>         >>>>              if (!Objects.equals(timerRegistered.value(),
>>         >> Boolean.TRUE))
>>         >>> {
>>         >>> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>>         >>>>                  timerRegistered.update(true);
>>         >>>>              }
>>         >>>>              Integer v = counter.value();
>>         >>>>              Integer incomingValue = value.f1;
>>         >>>>              if (v != null) {
>>         >>>>                  v += incomingValue;
>>         >>>>              } else {
>>         >>>>                  v = incomingValue;
>>         >>>>              }
>>         >>>>              counter.update(v);
>>         >>>>          }
>>         >>>>
>>         >>>>          ....
>>         >>>>
>>         >>>>     }
>>         >>>>
>>         >>>> Therefore I don't think the first part of your reply
>>         with separating
>>         >> the
>>         >>>> write and read workload applies here. We do not aim to
>>         create a
>>         >>>> competing API with the Table API. We think operations
>>         such as joins or
>>         >>>> analytical aggregations should be performed in Table API.
>>         >>>>
>>         >>>> As for the second part I agree it would be nice to fall
>>         back to the
>>         >>>> sorting approach only if a certain threshold of memory
>>         in a State
>>         >>>> Backend is used. This has some problems though. We would
>>         need a way to
>>         >>>> estimate the size of the occupied memory to tell when
>>         the threshold is
>>         >>>> reached. That is not easily doable by default e.g. in a
>>         >>>> MemoryStateBackend, as we do not serialize the values in
>>         the state
>>         >>>> backend by default. We would have to add that, but this
>>         would add the
>>         >>>> overhead of the serialization.
>>         >>>>
>>         >>>> This proposal aims at the cases where we do have a large
>>         state that
>>         >> will
>>         >>>> not fit into the memory and without the change users are
>>         forced to use
>>         >>>> RocksDB. If the state fits in memory I agree it will be
>>         better to do
>>         >>>> hash-based aggregations e.g. using the
>>         MemoryStateBackend. Therefore I
>>         >>>> think it is important to give users the choice to use
>>         one or the other
>>         >>>> approach. We might discuss which approach should be the
>>         default for
>>         >>>> RuntimeMode.BATCH proposed in FLIP-134. Should it be
>>         hash-based with
>>         >>>> user configured state backend or sorting-based with a
>>         single key at a
>>         >>>> time backend. Moreover we could think if we should let
>>         users choose the
>>         >>>> sort vs hash "state backend" per operator. Would that
>>         suffice?
>>         >>>>
>>         >>>> Ad. 2
>>         >>>>
>>         >>>> I still think we can just use the first X bytes of the
>>         serialized form
>>         >>>> as the normalized key and fallback to comparing full
>>         keys on clashes.
>>         >> It
>>         >>>> is because we are actually not interested in a logical
>>         order, but we
>>         >>>> care only about the "grouping" aspect of the sorting.
>>         Therefore I think
>>         >>>> its enough to compare only parts of the full key as the
>>         normalized key.
>>         >>>>
>>         >>>> Thanks again for the really nice and thorough feedback!
>>         >>>>
>>         >>>> Best,
>>         >>>>
>>         >>>> Dawid
>>         >>>>
>>         >>>> On 08/09/2020 14:47, Kurt Young wrote:
>>         >>>>> Regarding #1, yes the state backend is definitely
>>         hash-based
>>         >> execution.
>>         >>>>> However there are some differences between
>>         >>>>> batch hash-based execution. The key difference is
>>         *random access &
>>         >>>>> read/write mixed workload". For example, by using
>>         >>>>> state backend in streaming execution, one have to mix
>>         the read and
>>         >> write
>>         >>>>> operations and all of them are actually random
>>         >>>>> access. But in a batch hash execution, we could divide
>>         the phases into
>>         >>>>> write and read. For example, we can build the
>>         >>>>> hash table first, with only write operations. And once
>>         the build is
>>         >>> done,
>>         >>>>> we can start to read and trigger the user codes.
>>         >>>>> Take hash aggregation which blink planner implemented
>>         as an example,
>>         >>> during
>>         >>>>> building phase, as long as the hash map
>>         >>>>> could fit into memory, we will update the accumulators
>>         directly in the
>>         >>> hash
>>         >>>>> map. And once we are running out of memory,
>>         >>>>> we then fall back to sort based execution. It improves the
>>         >> performance a
>>         >>>>> lot if the incoming data can be processed in
>>         >>>>> memory.
>>         >>>>>
>>         >>>>> Regarding #2, IIUC you are actually describing a binary
>>         format of key,
>>         >>> not
>>         >>>>> normalized key which is used in DataSet. I will
>>         >>>>> take String for example. If we have lots of keys with
>>         length all
>>         >> greater
>>         >>>>> than, let's say 20. In your proposal, you will encode
>>         >>>>> the whole string in the prefix of your composed data (
>>         <key> +
>>         >>> <timestamp>
>>         >>>>> + <record> ). And when you compare
>>         >>>>> records, you will actually compare the *whole* key of
>>         the record. For
>>         >>>>> normalized key, it's fixed-length in this case, IIRC it
>>         will
>>         >>>>> take 8 bytes to represent the string. And the sorter
>>         will store the
>>         >>>>> normalized key and offset in a dedicated array. When doing
>>         >>>>> the sorting, it only sorts this *small* array. If the
>>         normalized keys
>>         >>> are
>>         >>>>> different, you could immediately tell which is greater from
>>         >>>>> normalized keys. You only have to compare the full keys
>>         if the
>>         >>> normalized
>>         >>>>> keys are equal and you know in this case the normalized
>>         >>>>> key couldn't represent the full key. The reason why
>>         Dataset is doing
>>         >>> this
>>         >>>>> is it's super cache efficient by sorting the *small* array.
>>         >>>>> The idea is borrowed from this paper [1]. Let me know
>>         if I missed or
>>         >>>>> misunderstood anything.
>>         >>>>>
>>         >>>>> [1] https://dl.acm.org/doi/10.5555/615232.615237
>>         (AlphaSort: a
>>         >>>>> cache-sensitive parallel external sort)
>>         >>>>>
>>         >>>>> Best,
>>         >>>>> Kurt
>>         >>>>>
>>         >>>>>
>>         >>>>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <
>>         >> dwysakowicz@apache.org <ma...@apache.org>
>>         >>>>> wrote:
>>         >>>>>
>>         >>>>>> Hey Kurt,
>>         >>>>>>
>>         >>>>>> Thank you for comments!
>>         >>>>>>
>>         >>>>>> Ad. 1 I might have missed something here, but as far
>>         as I see it is
>>         >>> that
>>         >>>>>> using the current execution stack with regular state
>>         backends
>>         >> (RocksDB
>>         >>>>>> in particular if we want to have spilling
>>         capabilities) is equivalent
>>         >>> to
>>         >>>>>> hash-based execution. I can see a different spilling
>>         state backend
>>         >>>>>> implementation in the future, but I think it is not
>>         batch specifc. Or
>>         >>> am
>>         >>>>>> I missing something?
>>         >>>>>>
>>         >>>>>> Ad. 2 Totally agree that normalized keys are important
>>         to the
>>         >>>>>> performance. I think though TypeComparators are not a
>>         necessity to
>>         >> have
>>         >>>>>> that. Actually  this proposal is heading towards only
>>         ever performing
>>         >>>>>> "normalized keys" comparison. I have not included in
>>         the proposal the
>>         >>>>>> binary format which we will use for sorting (partially
>>         because I
>>         >>> forgot,
>>         >>>>>> and partially because I thought it was too much of an
>>         implementation
>>         >>>>>> detail). Let me include it here though, as it might
>>         clear the
>>         >> situation
>>         >>>>>> a bit here.
>>         >>>>>>
>>         >>>>>> In DataSet, at times we have KeySelectors which
>>         extract keys based on
>>         >>>>>> field indices or names. This allows in certain
>>         situation to extract
>>         >> the
>>         >>>>>> key from serialized records. Compared to DataSet, in
>>         DataStream, the
>>         >>> key
>>         >>>>>> is always described with a black-box KeySelector, or
>>         differently
>>         >> with a
>>         >>>>>> function which extracts a key from a deserialized
>>         record.  In turn
>>         >>> there
>>         >>>>>> is no way to create a comparator that could compare
>>         records by
>>         >>>>>> extracting the key from a serialized record (neither
>>         with, nor
>>         >> without
>>         >>>>>> key normalization). We suggest that the input for the
>>         sorter will be
>>         >>>>>>
>>         >>>>>> <key> + <timestamp> + <record>
>>         >>>>>>
>>         >>>>>> Without having the key prepended we would have to
>>         deserialize the
>>         >>> record
>>         >>>>>> for every key comparison.
>>         >>>>>>
>>         >>>>>> Therefore if we agree that we perform binary
>>         comparison for keys
>>         >> (which
>>         >>>>>> are always prepended), it is actually equivalent to a
>>         DataSet with
>>         >>>>>> TypeComparators that support key normalization.
>>         >>>>>>
>>         >>>>>> Let me know if that is clear, or I have missed
>>         something here.
>>         >>>>>>
>>         >>>>>> Best,
>>         >>>>>>
>>         >>>>>> Dawid
>>         >>>>>>
>>         >>>>>> On 08/09/2020 03:39, Kurt Young wrote:
>>         >>>>>>> Hi Dawid, thanks for bringing this up, it's really
>>         exciting to see
>>         >>> that
>>         >>>>>>> batch execution is introduced in DataStream. From the
>>         flip, it seems
>>         >>>>>>> we are sticking with sort based execution mode (at
>>         least for now),
>>         >>> which
>>         >>>>>>> will sort the whole input data before any *keyed*
>>         operation is
>>         >>>>>>> executed. I have two comments here:
>>         >>>>>>>
>>         >>>>>>> 1. Do we want to introduce hash-based execution in
>>         the future? Sort
>>         >>> is a
>>         >>>>>>> safe choice but not the best in lots of cases. IIUC
>>         we only need
>>         >>>>>>> to make sure that before the framework finishes
>>         dealing with one
>>         >> key,
>>         >>> the
>>         >>>>>>> operator doesn't see any data belonging to other
>>         keys, thus
>>         >>>>>>> hash-based execution would also do the trick. Oon
>>         tricky thing the
>>         >>>>>>> framework might need to deal with is memory
>>         constraint and spilling
>>         >>>>>>> in the hash map, but Flink also has some good
>>         knowledge about these
>>         >>>>>> stuff.
>>         >>>>>>> 2. Going back to sort-based execution and how to sort
>>         keys. From my
>>         >>>>>>> experience, the performance of sorting would be one
>>         the most
>>         >> important
>>         >>>>>>> things if we want to achieve good performance of
>>         batch execution.
>>         >> And
>>         >>>>>>> normalized keys are actually the key of the
>>         performance of sorting.
>>         >>>>>>> If we want to get rid of TypeComparator, I think we
>>         still need to
>>         >>> find a
>>         >>>>>>> way to introduce this back.
>>         >>>>>>>
>>         >>>>>>> Best,
>>         >>>>>>> Kurt
>>         >>>>>>>
>>         >>>>>>>
>>         >>>>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <
>>         >> aljoscha@apache.org <ma...@apache.org>>
>>         >>>>>> wrote:
>>         >>>>>>>> Yes, I think we can address the problem of
>>         indeterminacy in a
>>         >>> separate
>>         >>>>>>>> FLIP because we're already in it.
>>         >>>>>>>>
>>         >>>>>>>> Aljoscha
>>         >>>>>>>>
>>         >>>>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>>         >>>>>>>>> @Seth That's a very good point. I agree that
>>         RocksDB has the same
>>         >>>>>>>>> problem. I think we can use the same approach for
>>         the sorted
>>         >>> shuffles
>>         >>>>>>>>> then. @Aljoscha I agree we should think about
>>         making it more
>>         >>> resilient,
>>         >>>>>>>>> as I guess users might have problems already if
>>         they use keys with
>>         >>>>>>>>> non-deterministic binary representation. How do you
>>         feel about
>>         >>>>>>>>> addressing that separately purely to limit the
>>         scope of this FLIP?
>>         >>>>>>>>>
>>         >>>>>>>>> @Aljoscha I tend to agree with you that the best
>>         place to actually
>>         >>>>>> place
>>         >>>>>>>>> the sorting would be in the InputProcessor(s). If
>>         there are no
>>         >> more
>>         >>>>>>>>> suggestions in respect to that issue. I'll put this
>>         proposal for
>>         >>>>>> voting.
>>         >>>>>>>>> @all Thank you for the feedback so far. I'd like to
>>         start a voting
>>         >>>>>>>>> thread on the proposal tomorrow. Therefore I'd
>>         appreciate if you
>>         >>>>>> comment
>>         >>>>>>>>> before that, if you still have some outstanding ideas.
>>         >>>>>>>>>
>>         >>>>>>>>> Best,
>>         >>>>>>>>>
>>         >>>>>>>>> Dawid
>>         >>>>>>>>>
>>         >>>>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>>         >>>>>>>>>> Seth is right, I was just about to write that as
>>         well. There is a
>>         >>>>>>>>>> problem, though, because some of our
>>         TypeSerializers are not
>>         >>>>>>>>>> deterministic even though we use them as if they
>>         were. Beam
>>         >>> excludes
>>         >>>>>>>>>> the FloatCoder, for example, and the AvroCoder in
>>         certain cases.
>>         >>> I'm
>>         >>>>>>>>>> pretty sure there is also weirdness going on in our
>>         >> KryoSerializer.
>>         >>>>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>>         >>>>>>>>>>> There is already an implicit assumption the
>>         TypeSerializer for
>>         >>> keys
>>         >>>>>> is
>>         >>>>>>>>>>> stable/deterministic, RocksDB compares keys using
>>         their
>>         >> serialized
>>         >>>>>> byte
>>         >>>>>>>>>>> strings. I think this is a non-issue (or at least
>>         it's not
>>         >>> changing
>>         >>>>>> the
>>         >>>>>>>>>>> status quo).
>>         >>>>>>>>>>>
>>         >>>>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther
>>         <twalthr@apache.org <ma...@apache.org>
>>         >>>>>>>> wrote:
>>         >>>>>>>>>>>> +1 for getting rid of the TypeComparator
>>         interface and rely on
>>         >>> the
>>         >>>>>>>>>>>> serialized representation for grouping.
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>> Adding a new type to DataStream API is quite
>>         difficult at the
>>         >>> moment
>>         >>>>>>>>>>>> due
>>         >>>>>>>>>>>> to too many components that are required:
>>         TypeInformation
>>         >> (tries
>>         >>> to
>>         >>>>>>>>>>>> deal
>>         >>>>>>>>>>>> with logical fields for TypeComparators),
>>         TypeSerializer (incl.
>>         >>> it's
>>         >>>>>>>>>>>> snapshot interfaces), and TypeComparator (with
>>         many methods and
>>         >>>>>>>>>>>> internals such normalized keys etc.).
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>> If necessary, we can add more simple
>>         comparison-related methods
>>         >>> to
>>         >>>>>> the
>>         >>>>>>>>>>>> TypeSerializer interface itself in the future (like
>>         >>>>>>>>>>>> TypeSerializer.isDeterministic).
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>> Regards,
>>         >>>>>>>>>>>> Timo
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>         >>>>>>>>>>>>> Thanks for publishing the FLIP!
>>         >>>>>>>>>>>>>
>>         >>>>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
>>         >>> dwysakowicz@apache.org <ma...@apache.org>>
>>         >>>>>>>>>>>>> wrote:
>>         >>>>>>>>>>>>>>      1. How to sort/group keys? What
>>         representation of the
>>         >> key
>>         >>>>>>>>>>>>>> should we
>>         >>>>>>>>>>>>>>         use? Should we sort on the binary form
>>         or should we
>>         >>> depend
>>         >>>>>> on
>>         >>>>>>>>>>>>>>         Comparators being available.
>>         >>>>>>>>>>>>> Initially, I suggested to Dawid (in private) to
>>         do the
>>         >>>>>>>>>>>>> sorting/grouping
>>         >>>>>>>>>>>> by using the binary representation. Then my
>>         opinion switched
>>         >> and
>>         >>> I
>>         >>>>>>>>>>>> thought
>>         >>>>>>>>>>>> we should use TypeComparator/Comparator because
>>         that's what the
>>         >>>>>>>>>>>> DataSet API
>>         >>>>>>>>>>>> uses. After talking to Stephan, I'm again
>>         encouraged in my
>>         >>> opinion
>>         >>>>>>>>>>>> to use
>>         >>>>>>>>>>>> the binary representation because it means we
>>         can eventually
>>         >> get
>>         >>> rid
>>         >>>>>>>>>>>> of the
>>         >>>>>>>>>>>> TypeComparator interface, which is a bit
>>         complicated, and
>>         >>> because we
>>         >>>>>>>>>>>> don't
>>         >>>>>>>>>>>> need any good order in our sort, we only need
>>         the grouping.
>>         >>>>>>>>>>>>> This comes with some problems, though: we need
>>         to ensure that
>>         >>> the
>>         >>>>>>>>>>>> TypeSerializer of the type we're sorting is
>>         >> stable/deterministic.
>>         >>>>>>>>>>>> Beam has
>>         >>>>>>>>>>>> infrastructure for this in the form of
>>         >>> Coder.verifyDeterministic()
>>         >>>>>> [1]
>>         >>>>>>>>>>>> which we don't have right now and should add if
>>         we go down this
>>         >>>>>> path.
>>         >>>>>>>>>>>>>>      2. Where in the stack should we apply the
>>         sorting (this
>>         >>>>>> rather a
>>         >>>>>>>>>>>>>>         discussion about internals)
>>         >>>>>>>>>>>>> Here, I'm gravitating towards the third option
>>         of implementing
>>         >>> it
>>         >>>>>>>>>>>>> in the
>>         >>>>>>>>>>>> layer of the StreamTask, which probably means
>>         implementing a
>>         >>> custom
>>         >>>>>>>>>>>> InputProcessor. I think it's best to do it in
>>         this layer
>>         >> because
>>         >>> we
>>         >>>>>>>>>>>> would
>>         >>>>>>>>>>>> not mix concerns of different layers as we would
>>         if we
>>         >>> implemented
>>         >>>>>>>>>>>> this as
>>         >>>>>>>>>>>> a custom StreamOperator. I think this solution
>>         is also best
>>         >> when
>>         >>> it
>>         >>>>>>>>>>>> comes
>>         >>>>>>>>>>>> to multi-input operators.
>>         >>>>>>>>>>>>>>      3. How should we deal with custom
>>         implementations of
>>         >>>>>>>>>>>>>> StreamOperators
>>         >>>>>>>>>>>>> I think the cleanest solution would be to go
>>         through the
>>         >>> complete
>>         >>>>>>>>>>>> operator lifecycle for every key, because then
>>         the watermark
>>         >>> would
>>         >>>>>> not
>>         >>>>>>>>>>>> oscillate between -Inf and +Inf and we would not
>>         break the
>>         >>>>>> semantical
>>         >>>>>>>>>>>> guarantees that we gave to operators so far, in
>>         that the
>>         >>> watermark
>>         >>>>>> is
>>         >>>>>>>>>>>> strictly monotonically increasing. However, I
>>         don't think this
>>         >>>>>>>>>>>> solution is
>>         >>>>>>>>>>>> feasible because it would come with too much
>>         overhead. We
>>         >> should
>>         >>>>>>>>>>>> solve this
>>         >>>>>>>>>>>> problem via documentation and maybe educate
>>         people to not query
>>         >>> the
>>         >>>>>>>>>>>> current
>>         >>>>>>>>>>>> watermark or not rely on the watermark being
>>         monotonically
>>         >>>>>>>>>>>> increasing in
>>         >>>>>>>>>>>> operator implementations to allow the framework
>>         more freedoms
>>         >> in
>>         >>> how
>>         >>>>>>>>>>>> user
>>         >>>>>>>>>>>> programs are executed.
>>         >>>>>>>>>>>>> Aljoscha
>>         >>>>>>>>>>>>>
>>         >>>>>>>>>>>>> [1]
>>         >>
>>         https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>         >>>
>>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Yu Li <ca...@gmail.com>.
*bq. The problem is that I could not use this "state backend" in a
StreamOperator.*
Ok, got your point now. I agree that it makes more sense to
make StateBackend return a contract instead of a particular implementation.
How about we name the new interface as `CheckpointableKeyedStateBackend`?
We could make `BoundedStreamStateBackend` implement
`CheckpointableKeyedStateBackend` but without checkpoint related operations
yet, whereas reserving the possibility that the bounded stream also
supports checkpoint in future. What do you think?

*bq. Correct, the goal though is not to outperform the HeapStateBackend.
The single key state backend requires sorted inputs which come with a
price. The main goal is to outperform RocksDBStateBackend, which is
necessary for large states.*
Personally I think the main benefit of introducing a bounded stream
specific state backend is that we could remove the data after processing a
key, thus reducing the cost of state storage a lot, rather than the routine
performance of state processing (smile).

Thanks.

Best Regards,
Yu


On Fri, 18 Sep 2020 at 20:48, Dawid Wysakowicz <dw...@apache.org>
wrote:

> ===============================================
>
>
>
>
> *class BoundedStreamInternalStateBackend<K> implements
> KeyedStateBackend<K>,
> SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,         Closeable,
>     CheckpointListener {*
> ===============================================
>
> The problem is that I could not use this "state backend" in a
> StreamOperator. The goal of this effort is that it is mostly transparent to
> all the implementations of StreamOperator(s). Right now StreamOperator
> retrieves AbstractKeyedStateBackend through StreamOperatorContext which
> instantiates it in StreamTaskInitializer etc. The problem is that a big
> chunk of the current code base uses the AbstractKeyedStateBackend, whereas
> it really just needs an interface not that particular implementation. The
> change is really only about separating the contract
> (InternalKeyedStateBackend) from the implementation
> (AbstractKeyedStateBackend). My thinking is that it is only an approach to
> fix a mistake of the past that StateBackend returns a particular
> implementation rather than a contract.
>
> I do agree I don't need the `SnapshotStrategy` and `CheckpointListener`
> interfaces. The thing though is that the runtime expects those contracts
> from an AbstractKeyedStateBackend.
>
> BTW, If you'd like to see how does this change really looks like you can
> check the PR I already opened for it:
> https://github.com/apache/flink/pull/13405/files
>
> Checking the FLIP more closely I found below description: "With a high
> number of keys it (HeapStateBackend) suffers a significant penalty and
> becomes even less performant for that particular case than the sorting
> approach", does it mean "HeapStateBackend" outperformed
> "SingleKeyStateBackend" when the number of keys is relatively small
>
> Correct, the goal though is not to outperform the HeapStateBackend. The
> single key state backend requires sorted inputs which come with a price.
> The main goal is to outperform RocksDBStateBackend, which is necessary for
> large states.
>
> Thanks for the summary. I think it's more specific and could help readers
> to better understand why we cannot use HeapKeyedStateBackend directly, than
> the single line description "when the StateBackend observes a new incoming
> key it will reset all acquired state objects so far". What do you think?
>
> Sure, I can add it to the document.
>
> Best,
>
> Dawid
> On 18/09/2020 14:29, Yu Li wrote:
>
> Thanks for the clarification Dawid. Some of my thoughts:
>
> *bq. The results are times for end-to-end execution of a job. Therefore
> the sorting part is included. The actual target of the replacement is
> RocksDB, which does the serialization and key bytes comparison as well.*
> I see. Checking the FLIP more closely I found below description: "With a
> high number of keys it (HeapStateBackend) suffers a significant penalty and
> becomes even less performant for that particular case than the sorting
> approach", does it mean "HeapStateBackend" outperformed
> "SingleKeyStateBackend" when the number of keys is relatively small? The
> micro-benchmark of ValueState removes the key shuffling phase, so its
> result could be self-explained.
>
> About `InternalKeyedStateBackend`, let me rephrase my question: why don't
> we add the new state backend like below instead of adding a new interface
> (and IMHO there's no need to implement the `SnapshotStrategy` and
> `CheckpointListener` interfaces since it doesn't support checkpoint)?
> Reserved for adding more internal state backends in future?
> ===============================================
>
>
>
>
> *class BoundedStreamInternalStateBackend<K> implements
> KeyedStateBackend<K>,
> SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,         Closeable,
>     CheckpointListener {*
> ===============================================
>
> *bq. Let me though quickly summarize and if you find it useful I can add
> it to the FLIP itself.*
> Thanks for the summary. I think it's more specific and could help readers
> to better understand why we cannot use HeapKeyedStateBackend directly, than
> the single line description "when the StateBackend observes a new incoming
> key it will reset all acquired state objects so far". What do you think?
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Thu, 17 Sep 2020 at 23:38, Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Thanks for the comments Yu.
>>
>> > First of all, for the performance testing result, I'm wondering whether
>> the
>> > sorting cost is counted in the result for both DataSet and refined
>> > DataStream implementations. I could think of the saving of hash
>> computation
>> > and final iteration to emit the word-count result (processing a key at a
>> > time could save such iteration), but not sure whether these cost savings
>> > are at the same grade of comparing the key bytes.
>> The results are times for end-to-end execution of a job. Therefore the
>> sorting part is included. The actual target of the replacement is
>> RocksDB, which does the serialization and key bytes comparison as well.
>> On top of that it adds all the RocksDB bookkeeping.
>>
>> > However, I'm not fully convinced to introduce a new
>> > `InternalKeyedStateBackend` interface. I agree that we don't need to
>> take
>> > the overhead of `AbstractKeyedStateBackend` since we don't plan to
>> support
>> > checkpoint for now, but why don't we directly write a state backend
>> > implementation for bounded stream? Or are we planning to introduce more
>> > internal state backends in future? What's more, the current design of
>> > `InternalKeyedStateBackend` in the FLIP document seems to be extending
>> as
>> > many interfaces as `AbstractedKeyedStateBackend` implements, which I
>> guess
>> > is a typo.
>> Maybe I was not clear enough about the change. This change does not
>> "strip" the AbstractKeyedStateBackend of any functionalities. My intent
>> is not to remove any methods of the AbstractKeyedStateBackend. The
>> problem here is that the AbstractKeyedStateBackend is an abstract class
>> (duh ;)), which does have some predefined implementation. Moreover it
>> requires objects such as InternalKeyContex, CloseableRegistry etc. to be
>> constructed, which we don't need/want e.g. in the single key state
>> backend. My intention here is to make the StateBackend return only pure
>> interfaces. (AbstractKeyedStateBackend is the only non-interface that
>> StateBackend returns). In other words I just want to make
>> AbstractKeyedStateBackend a proper interface. It is not a typo that
>> InternalKeyedStateBackend extends the same interfaces as
>> AbstractKeyedStateBackend does.
>>
>> > Thirdly, I suggest we name the special state backend as
>> > `BoundedStreamInternalStateBackend`. And from our existing javadoc of
>> > `StateBackend` it actually cannot be called a complete state
>> backend...: "A
>> > State Backend defines how the state of a streaming application is stored
>> > and checkpointed".
>> Thanks for the suggestion. Sure I can use that name. Yes I do agree it
>> is not a full fledged StateBackend. I do want it to be an internal
>> class, that is never used explicitly by users.
>>
>> > Lastly, I didn't find a detailed design of the "SingleKeyStateBackend"
>> in
>> > the FLIP,
>> I did not put it into the design, because 1) I found it internal. It
>> does not touch any public facing interfaces. 2) It is rather
>> straightforward. Let me though quickly summarize and if you find it
>> useful I can add it to the FLIP itself.
>>
>> > as how to detect the key switching
>> That is rather straightforwad. The state backend works only with the
>> assumption that the keys are sorted/grouped together. We keep the
>> current key and in the setCurrentKey we check if the new key is
>> different then the current one. Side note: yes, custom user operators
>> which call setCurrentKey explicitly might not work in this setup.
>>
>> > remove the data (especially in the non-windowing
>> > case), etc.
>> We only ever keep a single value for a state object. Therefore
>> ValueState is a very thin wrapper for a value, MapState for a HashMap,
>> ListState for a List etc. When the key changes we simply set the wrapped
>> value/map/state to null.
>>
>> I hope this clarifies a few things. Let me know if you have any questions.
>>
>> Best,
>>
>> Dawid
>>
>> On 17/09/2020 15:28, Yu Li wrote:
>> > Hi all,
>> >
>> > Sorry for being late to the discussion, but I just noticed there are
>> some
>> > state backend related changes proposed in this FLIP, so would like to
>> share
>> > my two cents.
>> >
>> > First of all, for the performance testing result, I'm wondering whether
>> the
>> > sorting cost is counted in the result for both DataSet and refined
>> > DataStream implementations. I could think of the saving of hash
>> computation
>> > and final iteration to emit the word-count result (processing a key at a
>> > time could save such iteration), but not sure whether these cost savings
>> > are at the same grade of comparing the key bytes.
>> >
>> > Regardless of the performance result, I agree that the capability of
>> > removing the data after processing a key could prominently reduce the
>> space
>> > required by state, so introducing a new state backend for bounded stream
>> > makes sense.
>> >
>> > However, I'm not fully convinced to introduce a new
>> > `InternalKeyedStateBackend` interface. I agree that we don't need to
>> take
>> > the overhead of `AbstractKeyedStateBackend` since we don't plan to
>> support
>> > checkpoint for now, but why don't we directly write a state backend
>> > implementation for bounded stream? Or are we planning to introduce more
>> > internal state backends in future? What's more, the current design of
>> > `InternalKeyedStateBackend` in the FLIP document seems to be extending
>> as
>> > many interfaces as `AbstractedKeyedStateBackend` implements, which I
>> guess
>> > is a typo.
>> >
>> > Thirdly, I suggest we name the special state backend as
>> > `BoundedStreamInternalStateBackend`. And from our existing javadoc of
>> > `StateBackend` it actually cannot be called a complete state
>> backend...: "A
>> > State Backend defines how the state of a streaming application is stored
>> > and checkpointed".
>> >
>> > Lastly, I didn't find a detailed design of the "SingleKeyStateBackend"
>> in
>> > the FLIP, and suggest we write the key design down, such as how to
>> detect
>> > the key switching and remove the data (especially in the non-windowing
>> > case), etc.
>> >
>> > Thanks.
>> >
>> > Best Regards,
>> > Yu
>> >
>> >
>> > On Wed, 9 Sep 2020 at 17:18, Kurt Young <yk...@gmail.com> wrote:
>> >
>> >> Yes, I didn't intend to block this FLIP, and some of the comments are
>> >> actually implementation details.
>> >> And all of them are handled internally, not visible to users, thus we
>> can
>> >> also change or improve them
>> >> in the future.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >>
>> >> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek <al...@apache.org>
>> >> wrote:
>> >>
>> >>> I think Kurts concerns/comments are very valid and we need to
>> implement
>> >>> such things in the future. However, I also think that we need to get
>> >>> started somewhere and I think what's proposed in this FLIP is a good
>> >>> starting point that we can build on. So we should not get paralyzed by
>> >>> thinking too far ahead into the future. Does that make sense?
>> >>>
>> >>> Best,
>> >>> Aljoscha
>> >>>
>> >>> On 08.09.20 16:59, Dawid Wysakowicz wrote:
>> >>>> Ad. 1
>> >>>>
>> >>>> Yes, you are right in principle.
>> >>>>
>> >>>> Let me though clarify my proposal a bit. The proposed sort-style
>> >>>> execution aims at a generic KeyedProcessFunction were all the
>> >>>> "aggregations" are actually performed in the user code. It tries to
>> >>>> improve the performance by actually removing the need to use RocksDB
>> >>> e.g.:
>> >>>>      private static final class Summer<K>
>> >>>>              extends KeyedProcessFunction<K, Tuple2<K, Integer>,
>> >>>> Tuple2<K, Integer>> {
>> >>>>
>> >>>>          ....
>> >>>>
>> >>>>          @Override
>> >>>>          public void processElement(
>> >>>>                  Tuple2<K, Integer> value,
>> >>>>                  Context ctx,
>> >>>>                  Collector<Tuple2<K, Integer>> out) throws Exception
>> {
>> >>>>              if (!Objects.equals(timerRegistered.value(),
>> >> Boolean.TRUE))
>> >>> {
>> >>> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>> >>>>                  timerRegistered.update(true);
>> >>>>              }
>> >>>>              Integer v = counter.value();
>> >>>>              Integer incomingValue = value.f1;
>> >>>>              if (v != null) {
>> >>>>                  v += incomingValue;
>> >>>>              } else {
>> >>>>                  v = incomingValue;
>> >>>>              }
>> >>>>              counter.update(v);
>> >>>>          }
>> >>>>
>> >>>>          ....
>> >>>>
>> >>>>     }
>> >>>>
>> >>>> Therefore I don't think the first part of your reply with separating
>> >> the
>> >>>> write and read workload applies here. We do not aim to create a
>> >>>> competing API with the Table API. We think operations such as joins
>> or
>> >>>> analytical aggregations should be performed in Table API.
>> >>>>
>> >>>> As for the second part I agree it would be nice to fall back to the
>> >>>> sorting approach only if a certain threshold of memory in a State
>> >>>> Backend is used. This has some problems though. We would need a way
>> to
>> >>>> estimate the size of the occupied memory to tell when the threshold
>> is
>> >>>> reached. That is not easily doable by default e.g. in a
>> >>>> MemoryStateBackend, as we do not serialize the values in the state
>> >>>> backend by default. We would have to add that, but this would add the
>> >>>> overhead of the serialization.
>> >>>>
>> >>>> This proposal aims at the cases where we do have a large state that
>> >> will
>> >>>> not fit into the memory and without the change users are forced to
>> use
>> >>>> RocksDB. If the state fits in memory I agree it will be better to do
>> >>>> hash-based aggregations e.g. using the MemoryStateBackend. Therefore
>> I
>> >>>> think it is important to give users the choice to use one or the
>> other
>> >>>> approach. We might discuss which approach should be the default for
>> >>>> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
>> >>>> user configured state backend or sorting-based with a single key at a
>> >>>> time backend. Moreover we could think if we should let users choose
>> the
>> >>>> sort vs hash "state backend" per operator. Would that suffice?
>> >>>>
>> >>>> Ad. 2
>> >>>>
>> >>>> I still think we can just use the first X bytes of the serialized
>> form
>> >>>> as the normalized key and fallback to comparing full keys on clashes.
>> >> It
>> >>>> is because we are actually not interested in a logical order, but we
>> >>>> care only about the "grouping" aspect of the sorting. Therefore I
>> think
>> >>>> its enough to compare only parts of the full key as the normalized
>> key.
>> >>>>
>> >>>> Thanks again for the really nice and thorough feedback!
>> >>>>
>> >>>> Best,
>> >>>>
>> >>>> Dawid
>> >>>>
>> >>>> On 08/09/2020 14:47, Kurt Young wrote:
>> >>>>> Regarding #1, yes the state backend is definitely hash-based
>> >> execution.
>> >>>>> However there are some differences between
>> >>>>> batch hash-based execution. The key difference is *random access &
>> >>>>> read/write mixed workload". For example, by using
>> >>>>> state backend in streaming execution, one have to mix the read and
>> >> write
>> >>>>> operations and all of them are actually random
>> >>>>> access. But in a batch hash execution, we could divide the phases
>> into
>> >>>>> write and read. For example, we can build the
>> >>>>> hash table first, with only write operations. And once the build is
>> >>> done,
>> >>>>> we can start to read and trigger the user codes.
>> >>>>> Take hash aggregation which blink planner implemented as an example,
>> >>> during
>> >>>>> building phase, as long as the hash map
>> >>>>> could fit into memory, we will update the accumulators directly in
>> the
>> >>> hash
>> >>>>> map. And once we are running out of memory,
>> >>>>> we then fall back to sort based execution. It improves the
>> >> performance a
>> >>>>> lot if the incoming data can be processed in
>> >>>>> memory.
>> >>>>>
>> >>>>> Regarding #2, IIUC you are actually describing a binary format of
>> key,
>> >>> not
>> >>>>> normalized key which is used in DataSet. I will
>> >>>>> take String for example. If we have lots of keys with length all
>> >> greater
>> >>>>> than, let's say 20. In your proposal, you will encode
>> >>>>> the whole string in the prefix of your composed data ( <key> +
>> >>> <timestamp>
>> >>>>> + <record> ). And when you compare
>> >>>>> records, you will actually compare the *whole* key of the record.
>> For
>> >>>>> normalized key, it's fixed-length in this case, IIRC it will
>> >>>>> take 8 bytes to represent the string. And the sorter will store the
>> >>>>> normalized key and offset in a dedicated array. When doing
>> >>>>> the sorting, it only sorts this *small* array. If the normalized
>> keys
>> >>> are
>> >>>>> different, you could immediately tell which is greater from
>> >>>>> normalized keys. You only have to compare the full keys if the
>> >>> normalized
>> >>>>> keys are equal and you know in this case the normalized
>> >>>>> key couldn't represent the full key. The reason why Dataset is doing
>> >>> this
>> >>>>> is it's super cache efficient by sorting the *small* array.
>> >>>>> The idea is borrowed from this paper [1]. Let me know if I missed or
>> >>>>> misunderstood anything.
>> >>>>>
>> >>>>> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
>> >>>>> cache-sensitive parallel external sort)
>> >>>>>
>> >>>>> Best,
>> >>>>> Kurt
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <
>> >> dwysakowicz@apache.org
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hey Kurt,
>> >>>>>>
>> >>>>>> Thank you for comments!
>> >>>>>>
>> >>>>>> Ad. 1 I might have missed something here, but as far as I see it is
>> >>> that
>> >>>>>> using the current execution stack with regular state backends
>> >> (RocksDB
>> >>>>>> in particular if we want to have spilling capabilities) is
>> equivalent
>> >>> to
>> >>>>>> hash-based execution. I can see a different spilling state backend
>> >>>>>> implementation in the future, but I think it is not batch specifc.
>> Or
>> >>> am
>> >>>>>> I missing something?
>> >>>>>>
>> >>>>>> Ad. 2 Totally agree that normalized keys are important to the
>> >>>>>> performance. I think though TypeComparators are not a necessity to
>> >> have
>> >>>>>> that. Actually  this proposal is heading towards only ever
>> performing
>> >>>>>> "normalized keys" comparison. I have not included in the proposal
>> the
>> >>>>>> binary format which we will use for sorting (partially because I
>> >>> forgot,
>> >>>>>> and partially because I thought it was too much of an
>> implementation
>> >>>>>> detail). Let me include it here though, as it might clear the
>> >> situation
>> >>>>>> a bit here.
>> >>>>>>
>> >>>>>> In DataSet, at times we have KeySelectors which extract keys based
>> on
>> >>>>>> field indices or names. This allows in certain situation to extract
>> >> the
>> >>>>>> key from serialized records. Compared to DataSet, in DataStream,
>> the
>> >>> key
>> >>>>>> is always described with a black-box KeySelector, or differently
>> >> with a
>> >>>>>> function which extracts a key from a deserialized record.  In turn
>> >>> there
>> >>>>>> is no way to create a comparator that could compare records by
>> >>>>>> extracting the key from a serialized record (neither with, nor
>> >> without
>> >>>>>> key normalization). We suggest that the input for the sorter will
>> be
>> >>>>>>
>> >>>>>> <key> + <timestamp> + <record>
>> >>>>>>
>> >>>>>> Without having the key prepended we would have to deserialize the
>> >>> record
>> >>>>>> for every key comparison.
>> >>>>>>
>> >>>>>> Therefore if we agree that we perform binary comparison for keys
>> >> (which
>> >>>>>> are always prepended), it is actually equivalent to a DataSet with
>> >>>>>> TypeComparators that support key normalization.
>> >>>>>>
>> >>>>>> Let me know if that is clear, or I have missed something here.
>> >>>>>>
>> >>>>>> Best,
>> >>>>>>
>> >>>>>> Dawid
>> >>>>>>
>> >>>>>> On 08/09/2020 03:39, Kurt Young wrote:
>> >>>>>>> Hi Dawid, thanks for bringing this up, it's really exciting to see
>> >>> that
>> >>>>>>> batch execution is introduced in DataStream. From the flip, it
>> seems
>> >>>>>>> we are sticking with sort based execution mode (at least for now),
>> >>> which
>> >>>>>>> will sort the whole input data before any *keyed* operation is
>> >>>>>>> executed. I have two comments here:
>> >>>>>>>
>> >>>>>>> 1. Do we want to introduce hash-based execution in the future?
>> Sort
>> >>> is a
>> >>>>>>> safe choice but not the best in lots of cases. IIUC we only need
>> >>>>>>> to make sure that before the framework finishes dealing with one
>> >> key,
>> >>> the
>> >>>>>>> operator doesn't see any data belonging to other keys, thus
>> >>>>>>> hash-based execution would also do the trick. Oon tricky thing the
>> >>>>>>> framework might need to deal with is memory constraint and
>> spilling
>> >>>>>>> in the hash map, but Flink also has some good knowledge about
>> these
>> >>>>>> stuff.
>> >>>>>>> 2. Going back to sort-based execution and how to sort keys. From
>> my
>> >>>>>>> experience, the performance of sorting would be one the most
>> >> important
>> >>>>>>> things if we want to achieve good performance of batch execution.
>> >> And
>> >>>>>>> normalized keys are actually the key of the performance of
>> sorting.
>> >>>>>>> If we want to get rid of TypeComparator, I think we still need to
>> >>> find a
>> >>>>>>> way to introduce this back.
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Kurt
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <
>> >> aljoscha@apache.org>
>> >>>>>> wrote:
>> >>>>>>>> Yes, I think we can address the problem of indeterminacy in a
>> >>> separate
>> >>>>>>>> FLIP because we're already in it.
>> >>>>>>>>
>> >>>>>>>> Aljoscha
>> >>>>>>>>
>> >>>>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>> >>>>>>>>> @Seth That's a very good point. I agree that RocksDB has the
>> same
>> >>>>>>>>> problem. I think we can use the same approach for the sorted
>> >>> shuffles
>> >>>>>>>>> then. @Aljoscha I agree we should think about making it more
>> >>> resilient,
>> >>>>>>>>> as I guess users might have problems already if they use keys
>> with
>> >>>>>>>>> non-deterministic binary representation. How do you feel about
>> >>>>>>>>> addressing that separately purely to limit the scope of this
>> FLIP?
>> >>>>>>>>>
>> >>>>>>>>> @Aljoscha I tend to agree with you that the best place to
>> actually
>> >>>>>> place
>> >>>>>>>>> the sorting would be in the InputProcessor(s). If there are no
>> >> more
>> >>>>>>>>> suggestions in respect to that issue. I'll put this proposal for
>> >>>>>> voting.
>> >>>>>>>>> @all Thank you for the feedback so far. I'd like to start a
>> voting
>> >>>>>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
>> >>>>>> comment
>> >>>>>>>>> before that, if you still have some outstanding ideas.
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>>
>> >>>>>>>>> Dawid
>> >>>>>>>>>
>> >>>>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>> >>>>>>>>>> Seth is right, I was just about to write that as well. There
>> is a
>> >>>>>>>>>> problem, though, because some of our TypeSerializers are not
>> >>>>>>>>>> deterministic even though we use them as if they were. Beam
>> >>> excludes
>> >>>>>>>>>> the FloatCoder, for example, and the AvroCoder in certain
>> cases.
>> >>> I'm
>> >>>>>>>>>> pretty sure there is also weirdness going on in our
>> >> KryoSerializer.
>> >>>>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>> >>>>>>>>>>> There is already an implicit assumption the TypeSerializer for
>> >>> keys
>> >>>>>> is
>> >>>>>>>>>>> stable/deterministic, RocksDB compares keys using their
>> >> serialized
>> >>>>>> byte
>> >>>>>>>>>>> strings. I think this is a non-issue (or at least it's not
>> >>> changing
>> >>>>>> the
>> >>>>>>>>>>> status quo).
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <
>> twalthr@apache.org
>> >>>>>>>> wrote:
>> >>>>>>>>>>>> +1 for getting rid of the TypeComparator interface and rely
>> on
>> >>> the
>> >>>>>>>>>>>> serialized representation for grouping.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Adding a new type to DataStream API is quite difficult at the
>> >>> moment
>> >>>>>>>>>>>> due
>> >>>>>>>>>>>> to too many components that are required: TypeInformation
>> >> (tries
>> >>> to
>> >>>>>>>>>>>> deal
>> >>>>>>>>>>>> with logical fields for TypeComparators), TypeSerializer
>> (incl.
>> >>> it's
>> >>>>>>>>>>>> snapshot interfaces), and TypeComparator (with many methods
>> and
>> >>>>>>>>>>>> internals such normalized keys etc.).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> If necessary, we can add more simple comparison-related
>> methods
>> >>> to
>> >>>>>> the
>> >>>>>>>>>>>> TypeSerializer interface itself in the future (like
>> >>>>>>>>>>>> TypeSerializer.isDeterministic).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Regards,
>> >>>>>>>>>>>> Timo
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>> >>>>>>>>>>>>> Thanks for publishing the FLIP!
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
>> >>> dwysakowicz@apache.org>
>> >>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>      1. How to sort/group keys? What representation of the
>> >> key
>> >>>>>>>>>>>>>> should we
>> >>>>>>>>>>>>>>         use? Should we sort on the binary form or should we
>> >>> depend
>> >>>>>> on
>> >>>>>>>>>>>>>>         Comparators being available.
>> >>>>>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
>> >>>>>>>>>>>>> sorting/grouping
>> >>>>>>>>>>>> by using the binary representation. Then my opinion switched
>> >> and
>> >>> I
>> >>>>>>>>>>>> thought
>> >>>>>>>>>>>> we should use TypeComparator/Comparator because that's what
>> the
>> >>>>>>>>>>>> DataSet API
>> >>>>>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my
>> >>> opinion
>> >>>>>>>>>>>> to use
>> >>>>>>>>>>>> the binary representation because it means we can eventually
>> >> get
>> >>> rid
>> >>>>>>>>>>>> of the
>> >>>>>>>>>>>> TypeComparator interface, which is a bit complicated, and
>> >>> because we
>> >>>>>>>>>>>> don't
>> >>>>>>>>>>>> need any good order in our sort, we only need the grouping.
>> >>>>>>>>>>>>> This comes with some problems, though: we need to ensure
>> that
>> >>> the
>> >>>>>>>>>>>> TypeSerializer of the type we're sorting is
>> >> stable/deterministic.
>> >>>>>>>>>>>> Beam has
>> >>>>>>>>>>>> infrastructure for this in the form of
>> >>> Coder.verifyDeterministic()
>> >>>>>> [1]
>> >>>>>>>>>>>> which we don't have right now and should add if we go down
>> this
>> >>>>>> path.
>> >>>>>>>>>>>>>>      2. Where in the stack should we apply the sorting
>> (this
>> >>>>>> rather a
>> >>>>>>>>>>>>>>         discussion about internals)
>> >>>>>>>>>>>>> Here, I'm gravitating towards the third option of
>> implementing
>> >>> it
>> >>>>>>>>>>>>> in the
>> >>>>>>>>>>>> layer of the StreamTask, which probably means implementing a
>> >>> custom
>> >>>>>>>>>>>> InputProcessor. I think it's best to do it in this layer
>> >> because
>> >>> we
>> >>>>>>>>>>>> would
>> >>>>>>>>>>>> not mix concerns of different layers as we would if we
>> >>> implemented
>> >>>>>>>>>>>> this as
>> >>>>>>>>>>>> a custom StreamOperator. I think this solution is also best
>> >> when
>> >>> it
>> >>>>>>>>>>>> comes
>> >>>>>>>>>>>> to multi-input operators.
>> >>>>>>>>>>>>>>      3. How should we deal with custom implementations of
>> >>>>>>>>>>>>>> StreamOperators
>> >>>>>>>>>>>>> I think the cleanest solution would be to go through the
>> >>> complete
>> >>>>>>>>>>>> operator lifecycle for every key, because then the watermark
>> >>> would
>> >>>>>> not
>> >>>>>>>>>>>> oscillate between -Inf and +Inf and we would not break the
>> >>>>>> semantical
>> >>>>>>>>>>>> guarantees that we gave to operators so far, in that the
>> >>> watermark
>> >>>>>> is
>> >>>>>>>>>>>> strictly monotonically increasing. However, I don't think
>> this
>> >>>>>>>>>>>> solution is
>> >>>>>>>>>>>> feasible because it would come with too much overhead. We
>> >> should
>> >>>>>>>>>>>> solve this
>> >>>>>>>>>>>> problem via documentation and maybe educate people to not
>> query
>> >>> the
>> >>>>>>>>>>>> current
>> >>>>>>>>>>>> watermark or not rely on the watermark being monotonically
>> >>>>>>>>>>>> increasing in
>> >>>>>>>>>>>> operator implementations to allow the framework more freedoms
>> >> in
>> >>> how
>> >>>>>>>>>>>> user
>> >>>>>>>>>>>> programs are executed.
>> >>>>>>>>>>>>> Aljoscha
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> [1]
>> >>
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>> >>>
>>
>>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Dawid Wysakowicz <dw...@apache.org>.
> ===============================================
> /class BoundedStreamInternalStateBackend<K> implements
>         KeyedStateBackend<K>,
>         SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
>         Closeable,
>         CheckpointListener {/
> ===============================================/
> /
The problem is that I could not use this "state backend" in a
StreamOperator. The goal of this effort is that it is mostly transparent
to all the implementations of StreamOperator(s). Right now
StreamOperator retrieves AbstractKeyedStateBackend through
StreamOperatorContext which instantiates it in StreamTaskInitializer
etc. The problem is that a big chunk of the current code base uses the
AbstractKeyedStateBackend, whereas it really just needs an interface not
that particular implementation. The change is really only about
separating the contract (InternalKeyedStateBackend) from the
implementation (AbstractKeyedStateBackend). My thinking is that it is
only an approach to fix a mistake of the past that StateBackend returns
a particular implementation rather than a contract.

I do agree I don't need the `SnapshotStrategy` and `CheckpointListener`
interfaces. The thing though is that the runtime expects those contracts
from an AbstractKeyedStateBackend.

BTW, If you'd like to see how does this change really looks like you can
check the PR I already opened for it:
https://github.com/apache/flink/pull/13405/files

> Checking the FLIP more closely I found below description: "With a high
> number of keys it (HeapStateBackend) suffers a significant penalty and
> becomes even less performant for that particular case than the sorting
> approach", does it mean "HeapStateBackend" outperformed
> "SingleKeyStateBackend" when the number of keys is relatively small
Correct, the goal though is not to outperform the HeapStateBackend. The
single key state backend requires sorted inputs which come with a price.
The main goal is to outperform RocksDBStateBackend, which is necessary
for large states.

> Thanks for the summary. I think it's more specific and could help
> readers to better understand why we cannot use HeapKeyedStateBackend
> directly, than the single line description "when the StateBackend
> observes a new incoming key it will reset all acquired state objects
> so far". What do you think?
Sure, I can add it to the document.

Best,

Dawid

On 18/09/2020 14:29, Yu Li wrote:
> Thanks for the clarification Dawid. Some of my thoughts:
>
> /bq. The results are times for end-to-end execution of a job.
> Therefore the sorting part is included. The actual target of the
> replacement is RocksDB, which does the serialization and key bytes
> comparison as well./
> I see. Checking the FLIP more closely I found below description: "With
> a high number of keys it (HeapStateBackend) suffers a significant
> penalty and becomes even less performant for that particular case than
> the sorting approach", does it mean "HeapStateBackend" outperformed
> "SingleKeyStateBackend" when the number of keys is relatively small?
> The micro-benchmark of ValueState removes the key shuffling phase, so
> its result could be self-explained.
>
> About `InternalKeyedStateBackend`, let me rephrase my question: why
> don't we add the new state backend like below instead of adding a new
> interface (and IMHO there's no need to implement the
> `SnapshotStrategy` and `CheckpointListener` interfaces since it
> doesn't support checkpoint)? Reserved for adding more internal state
> backends in future?
> ===============================================
> /class BoundedStreamInternalStateBackend<K> implements
>         KeyedStateBackend<K>,
>         SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
>         Closeable,
>         CheckpointListener {/
> ===============================================/
> /
>
> /bq. Let me though quickly summarize and if you find it useful I can
> add it to the FLIP itself./
> Thanks for the summary. I think it's more specific and could help
> readers to better understand why we cannot use HeapKeyedStateBackend
> directly, than the single line description "when the StateBackend
> observes a new incoming key it will reset all acquired state objects
> so far". What do you think?
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Thu, 17 Sep 2020 at 23:38, Dawid Wysakowicz <dwysakowicz@apache.org
> <ma...@apache.org>> wrote:
>
>     Thanks for the comments Yu.
>
>     > First of all, for the performance testing result, I'm wondering
>     whether the
>     > sorting cost is counted in the result for both DataSet and refined
>     > DataStream implementations. I could think of the saving of hash
>     computation
>     > and final iteration to emit the word-count result (processing a
>     key at a
>     > time could save such iteration), but not sure whether these cost
>     savings
>     > are at the same grade of comparing the key bytes.
>     The results are times for end-to-end execution of a job. Therefore the
>     sorting part is included. The actual target of the replacement is
>     RocksDB, which does the serialization and key bytes comparison as
>     well.
>     On top of that it adds all the RocksDB bookkeeping.
>
>     > However, I'm not fully convinced to introduce a new
>     > `InternalKeyedStateBackend` interface. I agree that we don't
>     need to take
>     > the overhead of `AbstractKeyedStateBackend` since we don't plan
>     to support
>     > checkpoint for now, but why don't we directly write a state backend
>     > implementation for bounded stream? Or are we planning to
>     introduce more
>     > internal state backends in future? What's more, the current
>     design of
>     > `InternalKeyedStateBackend` in the FLIP document seems to be
>     extending as
>     > many interfaces as `AbstractedKeyedStateBackend` implements,
>     which I guess
>     > is a typo.
>     Maybe I was not clear enough about the change. This change does not
>     "strip" the AbstractKeyedStateBackend of any functionalities. My
>     intent
>     is not to remove any methods of the AbstractKeyedStateBackend. The
>     problem here is that the AbstractKeyedStateBackend is an abstract
>     class
>     (duh ;)), which does have some predefined implementation. Moreover it
>     requires objects such as InternalKeyContex, CloseableRegistry etc.
>     to be
>     constructed, which we don't need/want e.g. in the single key state
>     backend. My intention here is to make the StateBackend return only
>     pure
>     interfaces. (AbstractKeyedStateBackend is the only non-interface that
>     StateBackend returns). In other words I just want to make
>     AbstractKeyedStateBackend a proper interface. It is not a typo that
>     InternalKeyedStateBackend extends the same interfaces as
>     AbstractKeyedStateBackend does.
>
>     > Thirdly, I suggest we name the special state backend as
>     > `BoundedStreamInternalStateBackend`. And from our existing
>     javadoc of
>     > `StateBackend` it actually cannot be called a complete state
>     backend...: "A
>     > State Backend defines how the state of a streaming application
>     is stored
>     > and checkpointed".
>     Thanks for the suggestion. Sure I can use that name. Yes I do agree it
>     is not a full fledged StateBackend. I do want it to be an internal
>     class, that is never used explicitly by users.
>
>     > Lastly, I didn't find a detailed design of the
>     "SingleKeyStateBackend" in
>     > the FLIP,
>     I did not put it into the design, because 1) I found it internal. It
>     does not touch any public facing interfaces. 2) It is rather
>     straightforward. Let me though quickly summarize and if you find it
>     useful I can add it to the FLIP itself.
>
>     > as how to detect the key switching
>     That is rather straightforwad. The state backend works only with the
>     assumption that the keys are sorted/grouped together. We keep the
>     current key and in the setCurrentKey we check if the new key is
>     different then the current one. Side note: yes, custom user operators
>     which call setCurrentKey explicitly might not work in this setup.
>
>     > remove the data (especially in the non-windowing
>     > case), etc.
>     We only ever keep a single value for a state object. Therefore
>     ValueState is a very thin wrapper for a value, MapState for a HashMap,
>     ListState for a List etc. When the key changes we simply set the
>     wrapped
>     value/map/state to null.
>
>     I hope this clarifies a few things. Let me know if you have any
>     questions.
>
>     Best,
>
>     Dawid
>
>     On 17/09/2020 15:28, Yu Li wrote:
>     > Hi all,
>     >
>     > Sorry for being late to the discussion, but I just noticed there
>     are some
>     > state backend related changes proposed in this FLIP, so would
>     like to share
>     > my two cents.
>     >
>     > First of all, for the performance testing result, I'm wondering
>     whether the
>     > sorting cost is counted in the result for both DataSet and refined
>     > DataStream implementations. I could think of the saving of hash
>     computation
>     > and final iteration to emit the word-count result (processing a
>     key at a
>     > time could save such iteration), but not sure whether these cost
>     savings
>     > are at the same grade of comparing the key bytes.
>     >
>     > Regardless of the performance result, I agree that the capability of
>     > removing the data after processing a key could prominently
>     reduce the space
>     > required by state, so introducing a new state backend for
>     bounded stream
>     > makes sense.
>     >
>     > However, I'm not fully convinced to introduce a new
>     > `InternalKeyedStateBackend` interface. I agree that we don't
>     need to take
>     > the overhead of `AbstractKeyedStateBackend` since we don't plan
>     to support
>     > checkpoint for now, but why don't we directly write a state backend
>     > implementation for bounded stream? Or are we planning to
>     introduce more
>     > internal state backends in future? What's more, the current
>     design of
>     > `InternalKeyedStateBackend` in the FLIP document seems to be
>     extending as
>     > many interfaces as `AbstractedKeyedStateBackend` implements,
>     which I guess
>     > is a typo.
>     >
>     > Thirdly, I suggest we name the special state backend as
>     > `BoundedStreamInternalStateBackend`. And from our existing
>     javadoc of
>     > `StateBackend` it actually cannot be called a complete state
>     backend...: "A
>     > State Backend defines how the state of a streaming application
>     is stored
>     > and checkpointed".
>     >
>     > Lastly, I didn't find a detailed design of the
>     "SingleKeyStateBackend" in
>     > the FLIP, and suggest we write the key design down, such as how
>     to detect
>     > the key switching and remove the data (especially in the
>     non-windowing
>     > case), etc.
>     >
>     > Thanks.
>     >
>     > Best Regards,
>     > Yu
>     >
>     >
>     > On Wed, 9 Sep 2020 at 17:18, Kurt Young <ykt836@gmail.com
>     <ma...@gmail.com>> wrote:
>     >
>     >> Yes, I didn't intend to block this FLIP, and some of the
>     comments are
>     >> actually implementation details.
>     >> And all of them are handled internally, not visible to users,
>     thus we can
>     >> also change or improve them
>     >> in the future.
>     >>
>     >> Best,
>     >> Kurt
>     >>
>     >>
>     >> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek
>     <aljoscha@apache.org <ma...@apache.org>>
>     >> wrote:
>     >>
>     >>> I think Kurts concerns/comments are very valid and we need to
>     implement
>     >>> such things in the future. However, I also think that we need
>     to get
>     >>> started somewhere and I think what's proposed in this FLIP is
>     a good
>     >>> starting point that we can build on. So we should not get
>     paralyzed by
>     >>> thinking too far ahead into the future. Does that make sense?
>     >>>
>     >>> Best,
>     >>> Aljoscha
>     >>>
>     >>> On 08.09.20 16:59, Dawid Wysakowicz wrote:
>     >>>> Ad. 1
>     >>>>
>     >>>> Yes, you are right in principle.
>     >>>>
>     >>>> Let me though clarify my proposal a bit. The proposed sort-style
>     >>>> execution aims at a generic KeyedProcessFunction were all the
>     >>>> "aggregations" are actually performed in the user code. It
>     tries to
>     >>>> improve the performance by actually removing the need to use
>     RocksDB
>     >>> e.g.:
>     >>>>      private static final class Summer<K>
>     >>>>              extends KeyedProcessFunction<K, Tuple2<K, Integer>,
>     >>>> Tuple2<K, Integer>> {
>     >>>>
>     >>>>          ....
>     >>>>
>     >>>>          @Override
>     >>>>          public void processElement(
>     >>>>                  Tuple2<K, Integer> value,
>     >>>>                  Context ctx,
>     >>>>                  Collector<Tuple2<K, Integer>> out) throws
>     Exception {
>     >>>>              if (!Objects.equals(timerRegistered.value(),
>     >> Boolean.TRUE))
>     >>> {
>     >>> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>     >>>>                  timerRegistered.update(true);
>     >>>>              }
>     >>>>              Integer v = counter.value();
>     >>>>              Integer incomingValue = value.f1;
>     >>>>              if (v != null) {
>     >>>>                  v += incomingValue;
>     >>>>              } else {
>     >>>>                  v = incomingValue;
>     >>>>              }
>     >>>>              counter.update(v);
>     >>>>          }
>     >>>>
>     >>>>          ....
>     >>>>
>     >>>>     }
>     >>>>
>     >>>> Therefore I don't think the first part of your reply with
>     separating
>     >> the
>     >>>> write and read workload applies here. We do not aim to create a
>     >>>> competing API with the Table API. We think operations such as
>     joins or
>     >>>> analytical aggregations should be performed in Table API.
>     >>>>
>     >>>> As for the second part I agree it would be nice to fall back
>     to the
>     >>>> sorting approach only if a certain threshold of memory in a State
>     >>>> Backend is used. This has some problems though. We would need
>     a way to
>     >>>> estimate the size of the occupied memory to tell when the
>     threshold is
>     >>>> reached. That is not easily doable by default e.g. in a
>     >>>> MemoryStateBackend, as we do not serialize the values in the
>     state
>     >>>> backend by default. We would have to add that, but this would
>     add the
>     >>>> overhead of the serialization.
>     >>>>
>     >>>> This proposal aims at the cases where we do have a large
>     state that
>     >> will
>     >>>> not fit into the memory and without the change users are
>     forced to use
>     >>>> RocksDB. If the state fits in memory I agree it will be
>     better to do
>     >>>> hash-based aggregations e.g. using the MemoryStateBackend.
>     Therefore I
>     >>>> think it is important to give users the choice to use one or
>     the other
>     >>>> approach. We might discuss which approach should be the
>     default for
>     >>>> RuntimeMode.BATCH proposed in FLIP-134. Should it be
>     hash-based with
>     >>>> user configured state backend or sorting-based with a single
>     key at a
>     >>>> time backend. Moreover we could think if we should let users
>     choose the
>     >>>> sort vs hash "state backend" per operator. Would that suffice?
>     >>>>
>     >>>> Ad. 2
>     >>>>
>     >>>> I still think we can just use the first X bytes of the
>     serialized form
>     >>>> as the normalized key and fallback to comparing full keys on
>     clashes.
>     >> It
>     >>>> is because we are actually not interested in a logical order,
>     but we
>     >>>> care only about the "grouping" aspect of the sorting.
>     Therefore I think
>     >>>> its enough to compare only parts of the full key as the
>     normalized key.
>     >>>>
>     >>>> Thanks again for the really nice and thorough feedback!
>     >>>>
>     >>>> Best,
>     >>>>
>     >>>> Dawid
>     >>>>
>     >>>> On 08/09/2020 14:47, Kurt Young wrote:
>     >>>>> Regarding #1, yes the state backend is definitely hash-based
>     >> execution.
>     >>>>> However there are some differences between
>     >>>>> batch hash-based execution. The key difference is *random
>     access &
>     >>>>> read/write mixed workload". For example, by using
>     >>>>> state backend in streaming execution, one have to mix the
>     read and
>     >> write
>     >>>>> operations and all of them are actually random
>     >>>>> access. But in a batch hash execution, we could divide the
>     phases into
>     >>>>> write and read. For example, we can build the
>     >>>>> hash table first, with only write operations. And once the
>     build is
>     >>> done,
>     >>>>> we can start to read and trigger the user codes.
>     >>>>> Take hash aggregation which blink planner implemented as an
>     example,
>     >>> during
>     >>>>> building phase, as long as the hash map
>     >>>>> could fit into memory, we will update the accumulators
>     directly in the
>     >>> hash
>     >>>>> map. And once we are running out of memory,
>     >>>>> we then fall back to sort based execution. It improves the
>     >> performance a
>     >>>>> lot if the incoming data can be processed in
>     >>>>> memory.
>     >>>>>
>     >>>>> Regarding #2, IIUC you are actually describing a binary
>     format of key,
>     >>> not
>     >>>>> normalized key which is used in DataSet. I will
>     >>>>> take String for example. If we have lots of keys with length all
>     >> greater
>     >>>>> than, let's say 20. In your proposal, you will encode
>     >>>>> the whole string in the prefix of your composed data ( <key> +
>     >>> <timestamp>
>     >>>>> + <record> ). And when you compare
>     >>>>> records, you will actually compare the *whole* key of the
>     record. For
>     >>>>> normalized key, it's fixed-length in this case, IIRC it will
>     >>>>> take 8 bytes to represent the string. And the sorter will
>     store the
>     >>>>> normalized key and offset in a dedicated array. When doing
>     >>>>> the sorting, it only sorts this *small* array. If the
>     normalized keys
>     >>> are
>     >>>>> different, you could immediately tell which is greater from
>     >>>>> normalized keys. You only have to compare the full keys if the
>     >>> normalized
>     >>>>> keys are equal and you know in this case the normalized
>     >>>>> key couldn't represent the full key. The reason why Dataset
>     is doing
>     >>> this
>     >>>>> is it's super cache efficient by sorting the *small* array.
>     >>>>> The idea is borrowed from this paper [1]. Let me know if I
>     missed or
>     >>>>> misunderstood anything.
>     >>>>>
>     >>>>> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
>     >>>>> cache-sensitive parallel external sort)
>     >>>>>
>     >>>>> Best,
>     >>>>> Kurt
>     >>>>>
>     >>>>>
>     >>>>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <
>     >> dwysakowicz@apache.org <ma...@apache.org>
>     >>>>> wrote:
>     >>>>>
>     >>>>>> Hey Kurt,
>     >>>>>>
>     >>>>>> Thank you for comments!
>     >>>>>>
>     >>>>>> Ad. 1 I might have missed something here, but as far as I
>     see it is
>     >>> that
>     >>>>>> using the current execution stack with regular state backends
>     >> (RocksDB
>     >>>>>> in particular if we want to have spilling capabilities) is
>     equivalent
>     >>> to
>     >>>>>> hash-based execution. I can see a different spilling state
>     backend
>     >>>>>> implementation in the future, but I think it is not batch
>     specifc. Or
>     >>> am
>     >>>>>> I missing something?
>     >>>>>>
>     >>>>>> Ad. 2 Totally agree that normalized keys are important to the
>     >>>>>> performance. I think though TypeComparators are not a
>     necessity to
>     >> have
>     >>>>>> that. Actually  this proposal is heading towards only ever
>     performing
>     >>>>>> "normalized keys" comparison. I have not included in the
>     proposal the
>     >>>>>> binary format which we will use for sorting (partially
>     because I
>     >>> forgot,
>     >>>>>> and partially because I thought it was too much of an
>     implementation
>     >>>>>> detail). Let me include it here though, as it might clear the
>     >> situation
>     >>>>>> a bit here.
>     >>>>>>
>     >>>>>> In DataSet, at times we have KeySelectors which extract
>     keys based on
>     >>>>>> field indices or names. This allows in certain situation to
>     extract
>     >> the
>     >>>>>> key from serialized records. Compared to DataSet, in
>     DataStream, the
>     >>> key
>     >>>>>> is always described with a black-box KeySelector, or
>     differently
>     >> with a
>     >>>>>> function which extracts a key from a deserialized record. 
>     In turn
>     >>> there
>     >>>>>> is no way to create a comparator that could compare records by
>     >>>>>> extracting the key from a serialized record (neither with, nor
>     >> without
>     >>>>>> key normalization). We suggest that the input for the
>     sorter will be
>     >>>>>>
>     >>>>>> <key> + <timestamp> + <record>
>     >>>>>>
>     >>>>>> Without having the key prepended we would have to
>     deserialize the
>     >>> record
>     >>>>>> for every key comparison.
>     >>>>>>
>     >>>>>> Therefore if we agree that we perform binary comparison for
>     keys
>     >> (which
>     >>>>>> are always prepended), it is actually equivalent to a
>     DataSet with
>     >>>>>> TypeComparators that support key normalization.
>     >>>>>>
>     >>>>>> Let me know if that is clear, or I have missed something here.
>     >>>>>>
>     >>>>>> Best,
>     >>>>>>
>     >>>>>> Dawid
>     >>>>>>
>     >>>>>> On 08/09/2020 03:39, Kurt Young wrote:
>     >>>>>>> Hi Dawid, thanks for bringing this up, it's really
>     exciting to see
>     >>> that
>     >>>>>>> batch execution is introduced in DataStream. From the
>     flip, it seems
>     >>>>>>> we are sticking with sort based execution mode (at least
>     for now),
>     >>> which
>     >>>>>>> will sort the whole input data before any *keyed* operation is
>     >>>>>>> executed. I have two comments here:
>     >>>>>>>
>     >>>>>>> 1. Do we want to introduce hash-based execution in the
>     future? Sort
>     >>> is a
>     >>>>>>> safe choice but not the best in lots of cases. IIUC we
>     only need
>     >>>>>>> to make sure that before the framework finishes dealing
>     with one
>     >> key,
>     >>> the
>     >>>>>>> operator doesn't see any data belonging to other keys, thus
>     >>>>>>> hash-based execution would also do the trick. Oon tricky
>     thing the
>     >>>>>>> framework might need to deal with is memory constraint and
>     spilling
>     >>>>>>> in the hash map, but Flink also has some good knowledge
>     about these
>     >>>>>> stuff.
>     >>>>>>> 2. Going back to sort-based execution and how to sort
>     keys. From my
>     >>>>>>> experience, the performance of sorting would be one the most
>     >> important
>     >>>>>>> things if we want to achieve good performance of batch
>     execution.
>     >> And
>     >>>>>>> normalized keys are actually the key of the performance of
>     sorting.
>     >>>>>>> If we want to get rid of TypeComparator, I think we still
>     need to
>     >>> find a
>     >>>>>>> way to introduce this back.
>     >>>>>>>
>     >>>>>>> Best,
>     >>>>>>> Kurt
>     >>>>>>>
>     >>>>>>>
>     >>>>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <
>     >> aljoscha@apache.org <ma...@apache.org>>
>     >>>>>> wrote:
>     >>>>>>>> Yes, I think we can address the problem of indeterminacy in a
>     >>> separate
>     >>>>>>>> FLIP because we're already in it.
>     >>>>>>>>
>     >>>>>>>> Aljoscha
>     >>>>>>>>
>     >>>>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>     >>>>>>>>> @Seth That's a very good point. I agree that RocksDB has
>     the same
>     >>>>>>>>> problem. I think we can use the same approach for the sorted
>     >>> shuffles
>     >>>>>>>>> then. @Aljoscha I agree we should think about making it more
>     >>> resilient,
>     >>>>>>>>> as I guess users might have problems already if they use
>     keys with
>     >>>>>>>>> non-deterministic binary representation. How do you feel
>     about
>     >>>>>>>>> addressing that separately purely to limit the scope of
>     this FLIP?
>     >>>>>>>>>
>     >>>>>>>>> @Aljoscha I tend to agree with you that the best place
>     to actually
>     >>>>>> place
>     >>>>>>>>> the sorting would be in the InputProcessor(s). If there
>     are no
>     >> more
>     >>>>>>>>> suggestions in respect to that issue. I'll put this
>     proposal for
>     >>>>>> voting.
>     >>>>>>>>> @all Thank you for the feedback so far. I'd like to
>     start a voting
>     >>>>>>>>> thread on the proposal tomorrow. Therefore I'd
>     appreciate if you
>     >>>>>> comment
>     >>>>>>>>> before that, if you still have some outstanding ideas.
>     >>>>>>>>>
>     >>>>>>>>> Best,
>     >>>>>>>>>
>     >>>>>>>>> Dawid
>     >>>>>>>>>
>     >>>>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>     >>>>>>>>>> Seth is right, I was just about to write that as well.
>     There is a
>     >>>>>>>>>> problem, though, because some of our TypeSerializers
>     are not
>     >>>>>>>>>> deterministic even though we use them as if they were. Beam
>     >>> excludes
>     >>>>>>>>>> the FloatCoder, for example, and the AvroCoder in
>     certain cases.
>     >>> I'm
>     >>>>>>>>>> pretty sure there is also weirdness going on in our
>     >> KryoSerializer.
>     >>>>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>     >>>>>>>>>>> There is already an implicit assumption the
>     TypeSerializer for
>     >>> keys
>     >>>>>> is
>     >>>>>>>>>>> stable/deterministic, RocksDB compares keys using their
>     >> serialized
>     >>>>>> byte
>     >>>>>>>>>>> strings. I think this is a non-issue (or at least it's not
>     >>> changing
>     >>>>>> the
>     >>>>>>>>>>> status quo).
>     >>>>>>>>>>>
>     >>>>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther
>     <twalthr@apache.org <ma...@apache.org>
>     >>>>>>>> wrote:
>     >>>>>>>>>>>> +1 for getting rid of the TypeComparator interface
>     and rely on
>     >>> the
>     >>>>>>>>>>>> serialized representation for grouping.
>     >>>>>>>>>>>>
>     >>>>>>>>>>>> Adding a new type to DataStream API is quite
>     difficult at the
>     >>> moment
>     >>>>>>>>>>>> due
>     >>>>>>>>>>>> to too many components that are required: TypeInformation
>     >> (tries
>     >>> to
>     >>>>>>>>>>>> deal
>     >>>>>>>>>>>> with logical fields for TypeComparators),
>     TypeSerializer (incl.
>     >>> it's
>     >>>>>>>>>>>> snapshot interfaces), and TypeComparator (with many
>     methods and
>     >>>>>>>>>>>> internals such normalized keys etc.).
>     >>>>>>>>>>>>
>     >>>>>>>>>>>> If necessary, we can add more simple
>     comparison-related methods
>     >>> to
>     >>>>>> the
>     >>>>>>>>>>>> TypeSerializer interface itself in the future (like
>     >>>>>>>>>>>> TypeSerializer.isDeterministic).
>     >>>>>>>>>>>>
>     >>>>>>>>>>>> Regards,
>     >>>>>>>>>>>> Timo
>     >>>>>>>>>>>>
>     >>>>>>>>>>>>
>     >>>>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>     >>>>>>>>>>>>> Thanks for publishing the FLIP!
>     >>>>>>>>>>>>>
>     >>>>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
>     >>> dwysakowicz@apache.org <ma...@apache.org>>
>     >>>>>>>>>>>>> wrote:
>     >>>>>>>>>>>>>>      1. How to sort/group keys? What representation
>     of the
>     >> key
>     >>>>>>>>>>>>>> should we
>     >>>>>>>>>>>>>>         use? Should we sort on the binary form or
>     should we
>     >>> depend
>     >>>>>> on
>     >>>>>>>>>>>>>>         Comparators being available.
>     >>>>>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
>     >>>>>>>>>>>>> sorting/grouping
>     >>>>>>>>>>>> by using the binary representation. Then my opinion
>     switched
>     >> and
>     >>> I
>     >>>>>>>>>>>> thought
>     >>>>>>>>>>>> we should use TypeComparator/Comparator because
>     that's what the
>     >>>>>>>>>>>> DataSet API
>     >>>>>>>>>>>> uses. After talking to Stephan, I'm again encouraged
>     in my
>     >>> opinion
>     >>>>>>>>>>>> to use
>     >>>>>>>>>>>> the binary representation because it means we can
>     eventually
>     >> get
>     >>> rid
>     >>>>>>>>>>>> of the
>     >>>>>>>>>>>> TypeComparator interface, which is a bit complicated, and
>     >>> because we
>     >>>>>>>>>>>> don't
>     >>>>>>>>>>>> need any good order in our sort, we only need the
>     grouping.
>     >>>>>>>>>>>>> This comes with some problems, though: we need to
>     ensure that
>     >>> the
>     >>>>>>>>>>>> TypeSerializer of the type we're sorting is
>     >> stable/deterministic.
>     >>>>>>>>>>>> Beam has
>     >>>>>>>>>>>> infrastructure for this in the form of
>     >>> Coder.verifyDeterministic()
>     >>>>>> [1]
>     >>>>>>>>>>>> which we don't have right now and should add if we go
>     down this
>     >>>>>> path.
>     >>>>>>>>>>>>>>      2. Where in the stack should we apply the
>     sorting (this
>     >>>>>> rather a
>     >>>>>>>>>>>>>>         discussion about internals)
>     >>>>>>>>>>>>> Here, I'm gravitating towards the third option of
>     implementing
>     >>> it
>     >>>>>>>>>>>>> in the
>     >>>>>>>>>>>> layer of the StreamTask, which probably means
>     implementing a
>     >>> custom
>     >>>>>>>>>>>> InputProcessor. I think it's best to do it in this layer
>     >> because
>     >>> we
>     >>>>>>>>>>>> would
>     >>>>>>>>>>>> not mix concerns of different layers as we would if we
>     >>> implemented
>     >>>>>>>>>>>> this as
>     >>>>>>>>>>>> a custom StreamOperator. I think this solution is
>     also best
>     >> when
>     >>> it
>     >>>>>>>>>>>> comes
>     >>>>>>>>>>>> to multi-input operators.
>     >>>>>>>>>>>>>>      3. How should we deal with custom
>     implementations of
>     >>>>>>>>>>>>>> StreamOperators
>     >>>>>>>>>>>>> I think the cleanest solution would be to go through the
>     >>> complete
>     >>>>>>>>>>>> operator lifecycle for every key, because then the
>     watermark
>     >>> would
>     >>>>>> not
>     >>>>>>>>>>>> oscillate between -Inf and +Inf and we would not
>     break the
>     >>>>>> semantical
>     >>>>>>>>>>>> guarantees that we gave to operators so far, in that the
>     >>> watermark
>     >>>>>> is
>     >>>>>>>>>>>> strictly monotonically increasing. However, I don't
>     think this
>     >>>>>>>>>>>> solution is
>     >>>>>>>>>>>> feasible because it would come with too much overhead. We
>     >> should
>     >>>>>>>>>>>> solve this
>     >>>>>>>>>>>> problem via documentation and maybe educate people to
>     not query
>     >>> the
>     >>>>>>>>>>>> current
>     >>>>>>>>>>>> watermark or not rely on the watermark being
>     monotonically
>     >>>>>>>>>>>> increasing in
>     >>>>>>>>>>>> operator implementations to allow the framework more
>     freedoms
>     >> in
>     >>> how
>     >>>>>>>>>>>> user
>     >>>>>>>>>>>> programs are executed.
>     >>>>>>>>>>>>> Aljoscha
>     >>>>>>>>>>>>>
>     >>>>>>>>>>>>> [1]
>     >>
>     https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>     >>>
>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Yu Li <ca...@gmail.com>.
Thanks for the clarification Dawid. Some of my thoughts:

*bq. The results are times for end-to-end execution of a job. Therefore the
sorting part is included. The actual target of the replacement is RocksDB,
which does the serialization and key bytes comparison as well.*
I see. Checking the FLIP more closely I found below description: "With a
high number of keys it (HeapStateBackend) suffers a significant penalty and
becomes even less performant for that particular case than the sorting
approach", does it mean "HeapStateBackend" outperformed
"SingleKeyStateBackend" when the number of keys is relatively small? The
micro-benchmark of ValueState removes the key shuffling phase, so its
result could be self-explained.

About `InternalKeyedStateBackend`, let me rephrase my question: why don't
we add the new state backend like below instead of adding a new interface
(and IMHO there's no need to implement the `SnapshotStrategy` and
`CheckpointListener` interfaces since it doesn't support checkpoint)?
Reserved for adding more internal state backends in future?
===============================================




*class BoundedStreamInternalStateBackend<K> implements
KeyedStateBackend<K>,
SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,        Closeable,
  CheckpointListener {*
===============================================

*bq. Let me though quickly summarize and if you find it useful I can add it
to the FLIP itself.*
Thanks for the summary. I think it's more specific and could help readers
to better understand why we cannot use HeapKeyedStateBackend directly, than
the single line description "when the StateBackend observes a new incoming
key it will reset all acquired state objects so far". What do you think?

Thanks.

Best Regards,
Yu


On Thu, 17 Sep 2020 at 23:38, Dawid Wysakowicz <dw...@apache.org>
wrote:

> Thanks for the comments Yu.
>
> > First of all, for the performance testing result, I'm wondering whether
> the
> > sorting cost is counted in the result for both DataSet and refined
> > DataStream implementations. I could think of the saving of hash
> computation
> > and final iteration to emit the word-count result (processing a key at a
> > time could save such iteration), but not sure whether these cost savings
> > are at the same grade of comparing the key bytes.
> The results are times for end-to-end execution of a job. Therefore the
> sorting part is included. The actual target of the replacement is
> RocksDB, which does the serialization and key bytes comparison as well.
> On top of that it adds all the RocksDB bookkeeping.
>
> > However, I'm not fully convinced to introduce a new
> > `InternalKeyedStateBackend` interface. I agree that we don't need to take
> > the overhead of `AbstractKeyedStateBackend` since we don't plan to
> support
> > checkpoint for now, but why don't we directly write a state backend
> > implementation for bounded stream? Or are we planning to introduce more
> > internal state backends in future? What's more, the current design of
> > `InternalKeyedStateBackend` in the FLIP document seems to be extending as
> > many interfaces as `AbstractedKeyedStateBackend` implements, which I
> guess
> > is a typo.
> Maybe I was not clear enough about the change. This change does not
> "strip" the AbstractKeyedStateBackend of any functionalities. My intent
> is not to remove any methods of the AbstractKeyedStateBackend. The
> problem here is that the AbstractKeyedStateBackend is an abstract class
> (duh ;)), which does have some predefined implementation. Moreover it
> requires objects such as InternalKeyContex, CloseableRegistry etc. to be
> constructed, which we don't need/want e.g. in the single key state
> backend. My intention here is to make the StateBackend return only pure
> interfaces. (AbstractKeyedStateBackend is the only non-interface that
> StateBackend returns). In other words I just want to make
> AbstractKeyedStateBackend a proper interface. It is not a typo that
> InternalKeyedStateBackend extends the same interfaces as
> AbstractKeyedStateBackend does.
>
> > Thirdly, I suggest we name the special state backend as
> > `BoundedStreamInternalStateBackend`. And from our existing javadoc of
> > `StateBackend` it actually cannot be called a complete state backend...:
> "A
> > State Backend defines how the state of a streaming application is stored
> > and checkpointed".
> Thanks for the suggestion. Sure I can use that name. Yes I do agree it
> is not a full fledged StateBackend. I do want it to be an internal
> class, that is never used explicitly by users.
>
> > Lastly, I didn't find a detailed design of the "SingleKeyStateBackend" in
> > the FLIP,
> I did not put it into the design, because 1) I found it internal. It
> does not touch any public facing interfaces. 2) It is rather
> straightforward. Let me though quickly summarize and if you find it
> useful I can add it to the FLIP itself.
>
> > as how to detect the key switching
> That is rather straightforwad. The state backend works only with the
> assumption that the keys are sorted/grouped together. We keep the
> current key and in the setCurrentKey we check if the new key is
> different then the current one. Side note: yes, custom user operators
> which call setCurrentKey explicitly might not work in this setup.
>
> > remove the data (especially in the non-windowing
> > case), etc.
> We only ever keep a single value for a state object. Therefore
> ValueState is a very thin wrapper for a value, MapState for a HashMap,
> ListState for a List etc. When the key changes we simply set the wrapped
> value/map/state to null.
>
> I hope this clarifies a few things. Let me know if you have any questions.
>
> Best,
>
> Dawid
>
> On 17/09/2020 15:28, Yu Li wrote:
> > Hi all,
> >
> > Sorry for being late to the discussion, but I just noticed there are some
> > state backend related changes proposed in this FLIP, so would like to
> share
> > my two cents.
> >
> > First of all, for the performance testing result, I'm wondering whether
> the
> > sorting cost is counted in the result for both DataSet and refined
> > DataStream implementations. I could think of the saving of hash
> computation
> > and final iteration to emit the word-count result (processing a key at a
> > time could save such iteration), but not sure whether these cost savings
> > are at the same grade of comparing the key bytes.
> >
> > Regardless of the performance result, I agree that the capability of
> > removing the data after processing a key could prominently reduce the
> space
> > required by state, so introducing a new state backend for bounded stream
> > makes sense.
> >
> > However, I'm not fully convinced to introduce a new
> > `InternalKeyedStateBackend` interface. I agree that we don't need to take
> > the overhead of `AbstractKeyedStateBackend` since we don't plan to
> support
> > checkpoint for now, but why don't we directly write a state backend
> > implementation for bounded stream? Or are we planning to introduce more
> > internal state backends in future? What's more, the current design of
> > `InternalKeyedStateBackend` in the FLIP document seems to be extending as
> > many interfaces as `AbstractedKeyedStateBackend` implements, which I
> guess
> > is a typo.
> >
> > Thirdly, I suggest we name the special state backend as
> > `BoundedStreamInternalStateBackend`. And from our existing javadoc of
> > `StateBackend` it actually cannot be called a complete state backend...:
> "A
> > State Backend defines how the state of a streaming application is stored
> > and checkpointed".
> >
> > Lastly, I didn't find a detailed design of the "SingleKeyStateBackend" in
> > the FLIP, and suggest we write the key design down, such as how to detect
> > the key switching and remove the data (especially in the non-windowing
> > case), etc.
> >
> > Thanks.
> >
> > Best Regards,
> > Yu
> >
> >
> > On Wed, 9 Sep 2020 at 17:18, Kurt Young <yk...@gmail.com> wrote:
> >
> >> Yes, I didn't intend to block this FLIP, and some of the comments are
> >> actually implementation details.
> >> And all of them are handled internally, not visible to users, thus we
> can
> >> also change or improve them
> >> in the future.
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek <al...@apache.org>
> >> wrote:
> >>
> >>> I think Kurts concerns/comments are very valid and we need to implement
> >>> such things in the future. However, I also think that we need to get
> >>> started somewhere and I think what's proposed in this FLIP is a good
> >>> starting point that we can build on. So we should not get paralyzed by
> >>> thinking too far ahead into the future. Does that make sense?
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> On 08.09.20 16:59, Dawid Wysakowicz wrote:
> >>>> Ad. 1
> >>>>
> >>>> Yes, you are right in principle.
> >>>>
> >>>> Let me though clarify my proposal a bit. The proposed sort-style
> >>>> execution aims at a generic KeyedProcessFunction were all the
> >>>> "aggregations" are actually performed in the user code. It tries to
> >>>> improve the performance by actually removing the need to use RocksDB
> >>> e.g.:
> >>>>      private static final class Summer<K>
> >>>>              extends KeyedProcessFunction<K, Tuple2<K, Integer>,
> >>>> Tuple2<K, Integer>> {
> >>>>
> >>>>          ....
> >>>>
> >>>>          @Override
> >>>>          public void processElement(
> >>>>                  Tuple2<K, Integer> value,
> >>>>                  Context ctx,
> >>>>                  Collector<Tuple2<K, Integer>> out) throws Exception {
> >>>>              if (!Objects.equals(timerRegistered.value(),
> >> Boolean.TRUE))
> >>> {
> >>> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
> >>>>                  timerRegistered.update(true);
> >>>>              }
> >>>>              Integer v = counter.value();
> >>>>              Integer incomingValue = value.f1;
> >>>>              if (v != null) {
> >>>>                  v += incomingValue;
> >>>>              } else {
> >>>>                  v = incomingValue;
> >>>>              }
> >>>>              counter.update(v);
> >>>>          }
> >>>>
> >>>>          ....
> >>>>
> >>>>     }
> >>>>
> >>>> Therefore I don't think the first part of your reply with separating
> >> the
> >>>> write and read workload applies here. We do not aim to create a
> >>>> competing API with the Table API. We think operations such as joins or
> >>>> analytical aggregations should be performed in Table API.
> >>>>
> >>>> As for the second part I agree it would be nice to fall back to the
> >>>> sorting approach only if a certain threshold of memory in a State
> >>>> Backend is used. This has some problems though. We would need a way to
> >>>> estimate the size of the occupied memory to tell when the threshold is
> >>>> reached. That is not easily doable by default e.g. in a
> >>>> MemoryStateBackend, as we do not serialize the values in the state
> >>>> backend by default. We would have to add that, but this would add the
> >>>> overhead of the serialization.
> >>>>
> >>>> This proposal aims at the cases where we do have a large state that
> >> will
> >>>> not fit into the memory and without the change users are forced to use
> >>>> RocksDB. If the state fits in memory I agree it will be better to do
> >>>> hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> >>>> think it is important to give users the choice to use one or the other
> >>>> approach. We might discuss which approach should be the default for
> >>>> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> >>>> user configured state backend or sorting-based with a single key at a
> >>>> time backend. Moreover we could think if we should let users choose
> the
> >>>> sort vs hash "state backend" per operator. Would that suffice?
> >>>>
> >>>> Ad. 2
> >>>>
> >>>> I still think we can just use the first X bytes of the serialized form
> >>>> as the normalized key and fallback to comparing full keys on clashes.
> >> It
> >>>> is because we are actually not interested in a logical order, but we
> >>>> care only about the "grouping" aspect of the sorting. Therefore I
> think
> >>>> its enough to compare only parts of the full key as the normalized
> key.
> >>>>
> >>>> Thanks again for the really nice and thorough feedback!
> >>>>
> >>>> Best,
> >>>>
> >>>> Dawid
> >>>>
> >>>> On 08/09/2020 14:47, Kurt Young wrote:
> >>>>> Regarding #1, yes the state backend is definitely hash-based
> >> execution.
> >>>>> However there are some differences between
> >>>>> batch hash-based execution. The key difference is *random access &
> >>>>> read/write mixed workload". For example, by using
> >>>>> state backend in streaming execution, one have to mix the read and
> >> write
> >>>>> operations and all of them are actually random
> >>>>> access. But in a batch hash execution, we could divide the phases
> into
> >>>>> write and read. For example, we can build the
> >>>>> hash table first, with only write operations. And once the build is
> >>> done,
> >>>>> we can start to read and trigger the user codes.
> >>>>> Take hash aggregation which blink planner implemented as an example,
> >>> during
> >>>>> building phase, as long as the hash map
> >>>>> could fit into memory, we will update the accumulators directly in
> the
> >>> hash
> >>>>> map. And once we are running out of memory,
> >>>>> we then fall back to sort based execution. It improves the
> >> performance a
> >>>>> lot if the incoming data can be processed in
> >>>>> memory.
> >>>>>
> >>>>> Regarding #2, IIUC you are actually describing a binary format of
> key,
> >>> not
> >>>>> normalized key which is used in DataSet. I will
> >>>>> take String for example. If we have lots of keys with length all
> >> greater
> >>>>> than, let's say 20. In your proposal, you will encode
> >>>>> the whole string in the prefix of your composed data ( <key> +
> >>> <timestamp>
> >>>>> + <record> ). And when you compare
> >>>>> records, you will actually compare the *whole* key of the record. For
> >>>>> normalized key, it's fixed-length in this case, IIRC it will
> >>>>> take 8 bytes to represent the string. And the sorter will store the
> >>>>> normalized key and offset in a dedicated array. When doing
> >>>>> the sorting, it only sorts this *small* array. If the normalized keys
> >>> are
> >>>>> different, you could immediately tell which is greater from
> >>>>> normalized keys. You only have to compare the full keys if the
> >>> normalized
> >>>>> keys are equal and you know in this case the normalized
> >>>>> key couldn't represent the full key. The reason why Dataset is doing
> >>> this
> >>>>> is it's super cache efficient by sorting the *small* array.
> >>>>> The idea is borrowed from this paper [1]. Let me know if I missed or
> >>>>> misunderstood anything.
> >>>>>
> >>>>> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
> >>>>> cache-sensitive parallel external sort)
> >>>>>
> >>>>> Best,
> >>>>> Kurt
> >>>>>
> >>>>>
> >>>>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <
> >> dwysakowicz@apache.org
> >>>>> wrote:
> >>>>>
> >>>>>> Hey Kurt,
> >>>>>>
> >>>>>> Thank you for comments!
> >>>>>>
> >>>>>> Ad. 1 I might have missed something here, but as far as I see it is
> >>> that
> >>>>>> using the current execution stack with regular state backends
> >> (RocksDB
> >>>>>> in particular if we want to have spilling capabilities) is
> equivalent
> >>> to
> >>>>>> hash-based execution. I can see a different spilling state backend
> >>>>>> implementation in the future, but I think it is not batch specifc.
> Or
> >>> am
> >>>>>> I missing something?
> >>>>>>
> >>>>>> Ad. 2 Totally agree that normalized keys are important to the
> >>>>>> performance. I think though TypeComparators are not a necessity to
> >> have
> >>>>>> that. Actually  this proposal is heading towards only ever
> performing
> >>>>>> "normalized keys" comparison. I have not included in the proposal
> the
> >>>>>> binary format which we will use for sorting (partially because I
> >>> forgot,
> >>>>>> and partially because I thought it was too much of an implementation
> >>>>>> detail). Let me include it here though, as it might clear the
> >> situation
> >>>>>> a bit here.
> >>>>>>
> >>>>>> In DataSet, at times we have KeySelectors which extract keys based
> on
> >>>>>> field indices or names. This allows in certain situation to extract
> >> the
> >>>>>> key from serialized records. Compared to DataSet, in DataStream, the
> >>> key
> >>>>>> is always described with a black-box KeySelector, or differently
> >> with a
> >>>>>> function which extracts a key from a deserialized record.  In turn
> >>> there
> >>>>>> is no way to create a comparator that could compare records by
> >>>>>> extracting the key from a serialized record (neither with, nor
> >> without
> >>>>>> key normalization). We suggest that the input for the sorter will be
> >>>>>>
> >>>>>> <key> + <timestamp> + <record>
> >>>>>>
> >>>>>> Without having the key prepended we would have to deserialize the
> >>> record
> >>>>>> for every key comparison.
> >>>>>>
> >>>>>> Therefore if we agree that we perform binary comparison for keys
> >> (which
> >>>>>> are always prepended), it is actually equivalent to a DataSet with
> >>>>>> TypeComparators that support key normalization.
> >>>>>>
> >>>>>> Let me know if that is clear, or I have missed something here.
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Dawid
> >>>>>>
> >>>>>> On 08/09/2020 03:39, Kurt Young wrote:
> >>>>>>> Hi Dawid, thanks for bringing this up, it's really exciting to see
> >>> that
> >>>>>>> batch execution is introduced in DataStream. From the flip, it
> seems
> >>>>>>> we are sticking with sort based execution mode (at least for now),
> >>> which
> >>>>>>> will sort the whole input data before any *keyed* operation is
> >>>>>>> executed. I have two comments here:
> >>>>>>>
> >>>>>>> 1. Do we want to introduce hash-based execution in the future? Sort
> >>> is a
> >>>>>>> safe choice but not the best in lots of cases. IIUC we only need
> >>>>>>> to make sure that before the framework finishes dealing with one
> >> key,
> >>> the
> >>>>>>> operator doesn't see any data belonging to other keys, thus
> >>>>>>> hash-based execution would also do the trick. Oon tricky thing the
> >>>>>>> framework might need to deal with is memory constraint and spilling
> >>>>>>> in the hash map, but Flink also has some good knowledge about these
> >>>>>> stuff.
> >>>>>>> 2. Going back to sort-based execution and how to sort keys. From my
> >>>>>>> experience, the performance of sorting would be one the most
> >> important
> >>>>>>> things if we want to achieve good performance of batch execution.
> >> And
> >>>>>>> normalized keys are actually the key of the performance of sorting.
> >>>>>>> If we want to get rid of TypeComparator, I think we still need to
> >>> find a
> >>>>>>> way to introduce this back.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Kurt
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <
> >> aljoscha@apache.org>
> >>>>>> wrote:
> >>>>>>>> Yes, I think we can address the problem of indeterminacy in a
> >>> separate
> >>>>>>>> FLIP because we're already in it.
> >>>>>>>>
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
> >>>>>>>>> @Seth That's a very good point. I agree that RocksDB has the same
> >>>>>>>>> problem. I think we can use the same approach for the sorted
> >>> shuffles
> >>>>>>>>> then. @Aljoscha I agree we should think about making it more
> >>> resilient,
> >>>>>>>>> as I guess users might have problems already if they use keys
> with
> >>>>>>>>> non-deterministic binary representation. How do you feel about
> >>>>>>>>> addressing that separately purely to limit the scope of this
> FLIP?
> >>>>>>>>>
> >>>>>>>>> @Aljoscha I tend to agree with you that the best place to
> actually
> >>>>>> place
> >>>>>>>>> the sorting would be in the InputProcessor(s). If there are no
> >> more
> >>>>>>>>> suggestions in respect to that issue. I'll put this proposal for
> >>>>>> voting.
> >>>>>>>>> @all Thank you for the feedback so far. I'd like to start a
> voting
> >>>>>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
> >>>>>> comment
> >>>>>>>>> before that, if you still have some outstanding ideas.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>>
> >>>>>>>>> Dawid
> >>>>>>>>>
> >>>>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
> >>>>>>>>>> Seth is right, I was just about to write that as well. There is
> a
> >>>>>>>>>> problem, though, because some of our TypeSerializers are not
> >>>>>>>>>> deterministic even though we use them as if they were. Beam
> >>> excludes
> >>>>>>>>>> the FloatCoder, for example, and the AvroCoder in certain cases.
> >>> I'm
> >>>>>>>>>> pretty sure there is also weirdness going on in our
> >> KryoSerializer.
> >>>>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
> >>>>>>>>>>> There is already an implicit assumption the TypeSerializer for
> >>> keys
> >>>>>> is
> >>>>>>>>>>> stable/deterministic, RocksDB compares keys using their
> >> serialized
> >>>>>> byte
> >>>>>>>>>>> strings. I think this is a non-issue (or at least it's not
> >>> changing
> >>>>>> the
> >>>>>>>>>>> status quo).
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <
> twalthr@apache.org
> >>>>>>>> wrote:
> >>>>>>>>>>>> +1 for getting rid of the TypeComparator interface and rely on
> >>> the
> >>>>>>>>>>>> serialized representation for grouping.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Adding a new type to DataStream API is quite difficult at the
> >>> moment
> >>>>>>>>>>>> due
> >>>>>>>>>>>> to too many components that are required: TypeInformation
> >> (tries
> >>> to
> >>>>>>>>>>>> deal
> >>>>>>>>>>>> with logical fields for TypeComparators), TypeSerializer
> (incl.
> >>> it's
> >>>>>>>>>>>> snapshot interfaces), and TypeComparator (with many methods
> and
> >>>>>>>>>>>> internals such normalized keys etc.).
> >>>>>>>>>>>>
> >>>>>>>>>>>> If necessary, we can add more simple comparison-related
> methods
> >>> to
> >>>>>> the
> >>>>>>>>>>>> TypeSerializer interface itself in the future (like
> >>>>>>>>>>>> TypeSerializer.isDeterministic).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
> >>>>>>>>>>>>> Thanks for publishing the FLIP!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
> >>> dwysakowicz@apache.org>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>      1. How to sort/group keys? What representation of the
> >> key
> >>>>>>>>>>>>>> should we
> >>>>>>>>>>>>>>         use? Should we sort on the binary form or should we
> >>> depend
> >>>>>> on
> >>>>>>>>>>>>>>         Comparators being available.
> >>>>>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
> >>>>>>>>>>>>> sorting/grouping
> >>>>>>>>>>>> by using the binary representation. Then my opinion switched
> >> and
> >>> I
> >>>>>>>>>>>> thought
> >>>>>>>>>>>> we should use TypeComparator/Comparator because that's what
> the
> >>>>>>>>>>>> DataSet API
> >>>>>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my
> >>> opinion
> >>>>>>>>>>>> to use
> >>>>>>>>>>>> the binary representation because it means we can eventually
> >> get
> >>> rid
> >>>>>>>>>>>> of the
> >>>>>>>>>>>> TypeComparator interface, which is a bit complicated, and
> >>> because we
> >>>>>>>>>>>> don't
> >>>>>>>>>>>> need any good order in our sort, we only need the grouping.
> >>>>>>>>>>>>> This comes with some problems, though: we need to ensure that
> >>> the
> >>>>>>>>>>>> TypeSerializer of the type we're sorting is
> >> stable/deterministic.
> >>>>>>>>>>>> Beam has
> >>>>>>>>>>>> infrastructure for this in the form of
> >>> Coder.verifyDeterministic()
> >>>>>> [1]
> >>>>>>>>>>>> which we don't have right now and should add if we go down
> this
> >>>>>> path.
> >>>>>>>>>>>>>>      2. Where in the stack should we apply the sorting (this
> >>>>>> rather a
> >>>>>>>>>>>>>>         discussion about internals)
> >>>>>>>>>>>>> Here, I'm gravitating towards the third option of
> implementing
> >>> it
> >>>>>>>>>>>>> in the
> >>>>>>>>>>>> layer of the StreamTask, which probably means implementing a
> >>> custom
> >>>>>>>>>>>> InputProcessor. I think it's best to do it in this layer
> >> because
> >>> we
> >>>>>>>>>>>> would
> >>>>>>>>>>>> not mix concerns of different layers as we would if we
> >>> implemented
> >>>>>>>>>>>> this as
> >>>>>>>>>>>> a custom StreamOperator. I think this solution is also best
> >> when
> >>> it
> >>>>>>>>>>>> comes
> >>>>>>>>>>>> to multi-input operators.
> >>>>>>>>>>>>>>      3. How should we deal with custom implementations of
> >>>>>>>>>>>>>> StreamOperators
> >>>>>>>>>>>>> I think the cleanest solution would be to go through the
> >>> complete
> >>>>>>>>>>>> operator lifecycle for every key, because then the watermark
> >>> would
> >>>>>> not
> >>>>>>>>>>>> oscillate between -Inf and +Inf and we would not break the
> >>>>>> semantical
> >>>>>>>>>>>> guarantees that we gave to operators so far, in that the
> >>> watermark
> >>>>>> is
> >>>>>>>>>>>> strictly monotonically increasing. However, I don't think this
> >>>>>>>>>>>> solution is
> >>>>>>>>>>>> feasible because it would come with too much overhead. We
> >> should
> >>>>>>>>>>>> solve this
> >>>>>>>>>>>> problem via documentation and maybe educate people to not
> query
> >>> the
> >>>>>>>>>>>> current
> >>>>>>>>>>>> watermark or not rely on the watermark being monotonically
> >>>>>>>>>>>> increasing in
> >>>>>>>>>>>> operator implementations to allow the framework more freedoms
> >> in
> >>> how
> >>>>>>>>>>>> user
> >>>>>>>>>>>> programs are executed.
> >>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> >>>
>
>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Dawid Wysakowicz <dw...@apache.org>.
Thanks for the comments Yu.

> First of all, for the performance testing result, I'm wondering whether the
> sorting cost is counted in the result for both DataSet and refined
> DataStream implementations. I could think of the saving of hash computation
> and final iteration to emit the word-count result (processing a key at a
> time could save such iteration), but not sure whether these cost savings
> are at the same grade of comparing the key bytes.
The results are times for end-to-end execution of a job. Therefore the
sorting part is included. The actual target of the replacement is
RocksDB, which does the serialization and key bytes comparison as well.
On top of that it adds all the RocksDB bookkeeping.

> However, I'm not fully convinced to introduce a new
> `InternalKeyedStateBackend` interface. I agree that we don't need to take
> the overhead of `AbstractKeyedStateBackend` since we don't plan to support
> checkpoint for now, but why don't we directly write a state backend
> implementation for bounded stream? Or are we planning to introduce more
> internal state backends in future? What's more, the current design of
> `InternalKeyedStateBackend` in the FLIP document seems to be extending as
> many interfaces as `AbstractedKeyedStateBackend` implements, which I guess
> is a typo.
Maybe I was not clear enough about the change. This change does not
"strip" the AbstractKeyedStateBackend of any functionalities. My intent
is not to remove any methods of the AbstractKeyedStateBackend. The
problem here is that the AbstractKeyedStateBackend is an abstract class
(duh ;)), which does have some predefined implementation. Moreover it
requires objects such as InternalKeyContex, CloseableRegistry etc. to be
constructed, which we don't need/want e.g. in the single key state
backend. My intention here is to make the StateBackend return only pure
interfaces. (AbstractKeyedStateBackend is the only non-interface that
StateBackend returns). In other words I just want to make
AbstractKeyedStateBackend a proper interface. It is not a typo that
InternalKeyedStateBackend extends the same interfaces as
AbstractKeyedStateBackend does.

> Thirdly, I suggest we name the special state backend as
> `BoundedStreamInternalStateBackend`. And from our existing javadoc of
> `StateBackend` it actually cannot be called a complete state backend...: "A
> State Backend defines how the state of a streaming application is stored
> and checkpointed".
Thanks for the suggestion. Sure I can use that name. Yes I do agree it
is not a full fledged StateBackend. I do want it to be an internal
class, that is never used explicitly by users.

> Lastly, I didn't find a detailed design of the "SingleKeyStateBackend" in
> the FLIP,
I did not put it into the design, because 1) I found it internal. It
does not touch any public facing interfaces. 2) It is rather
straightforward. Let me though quickly summarize and if you find it
useful I can add it to the FLIP itself.

> as how to detect the key switching
That is rather straightforwad. The state backend works only with the
assumption that the keys are sorted/grouped together. We keep the
current key and in the setCurrentKey we check if the new key is
different then the current one. Side note: yes, custom user operators
which call setCurrentKey explicitly might not work in this setup.

> remove the data (especially in the non-windowing
> case), etc.
We only ever keep a single value for a state object. Therefore
ValueState is a very thin wrapper for a value, MapState for a HashMap,
ListState for a List etc. When the key changes we simply set the wrapped
value/map/state to null.

I hope this clarifies a few things. Let me know if you have any questions.

Best,

Dawid

On 17/09/2020 15:28, Yu Li wrote:
> Hi all,
>
> Sorry for being late to the discussion, but I just noticed there are some
> state backend related changes proposed in this FLIP, so would like to share
> my two cents.
>
> First of all, for the performance testing result, I'm wondering whether the
> sorting cost is counted in the result for both DataSet and refined
> DataStream implementations. I could think of the saving of hash computation
> and final iteration to emit the word-count result (processing a key at a
> time could save such iteration), but not sure whether these cost savings
> are at the same grade of comparing the key bytes.
>
> Regardless of the performance result, I agree that the capability of
> removing the data after processing a key could prominently reduce the space
> required by state, so introducing a new state backend for bounded stream
> makes sense.
>
> However, I'm not fully convinced to introduce a new
> `InternalKeyedStateBackend` interface. I agree that we don't need to take
> the overhead of `AbstractKeyedStateBackend` since we don't plan to support
> checkpoint for now, but why don't we directly write a state backend
> implementation for bounded stream? Or are we planning to introduce more
> internal state backends in future? What's more, the current design of
> `InternalKeyedStateBackend` in the FLIP document seems to be extending as
> many interfaces as `AbstractedKeyedStateBackend` implements, which I guess
> is a typo.
>
> Thirdly, I suggest we name the special state backend as
> `BoundedStreamInternalStateBackend`. And from our existing javadoc of
> `StateBackend` it actually cannot be called a complete state backend...: "A
> State Backend defines how the state of a streaming application is stored
> and checkpointed".
>
> Lastly, I didn't find a detailed design of the "SingleKeyStateBackend" in
> the FLIP, and suggest we write the key design down, such as how to detect
> the key switching and remove the data (especially in the non-windowing
> case), etc.
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Wed, 9 Sep 2020 at 17:18, Kurt Young <yk...@gmail.com> wrote:
>
>> Yes, I didn't intend to block this FLIP, and some of the comments are
>> actually implementation details.
>> And all of them are handled internally, not visible to users, thus we can
>> also change or improve them
>> in the future.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> I think Kurts concerns/comments are very valid and we need to implement
>>> such things in the future. However, I also think that we need to get
>>> started somewhere and I think what's proposed in this FLIP is a good
>>> starting point that we can build on. So we should not get paralyzed by
>>> thinking too far ahead into the future. Does that make sense?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 08.09.20 16:59, Dawid Wysakowicz wrote:
>>>> Ad. 1
>>>>
>>>> Yes, you are right in principle.
>>>>
>>>> Let me though clarify my proposal a bit. The proposed sort-style
>>>> execution aims at a generic KeyedProcessFunction were all the
>>>> "aggregations" are actually performed in the user code. It tries to
>>>> improve the performance by actually removing the need to use RocksDB
>>> e.g.:
>>>>      private static final class Summer<K>
>>>>              extends KeyedProcessFunction<K, Tuple2<K, Integer>,
>>>> Tuple2<K, Integer>> {
>>>>
>>>>          ....
>>>>
>>>>          @Override
>>>>          public void processElement(
>>>>                  Tuple2<K, Integer> value,
>>>>                  Context ctx,
>>>>                  Collector<Tuple2<K, Integer>> out) throws Exception {
>>>>              if (!Objects.equals(timerRegistered.value(),
>> Boolean.TRUE))
>>> {
>>> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>>>>                  timerRegistered.update(true);
>>>>              }
>>>>              Integer v = counter.value();
>>>>              Integer incomingValue = value.f1;
>>>>              if (v != null) {
>>>>                  v += incomingValue;
>>>>              } else {
>>>>                  v = incomingValue;
>>>>              }
>>>>              counter.update(v);
>>>>          }
>>>>
>>>>          ....
>>>>
>>>>     }
>>>>
>>>> Therefore I don't think the first part of your reply with separating
>> the
>>>> write and read workload applies here. We do not aim to create a
>>>> competing API with the Table API. We think operations such as joins or
>>>> analytical aggregations should be performed in Table API.
>>>>
>>>> As for the second part I agree it would be nice to fall back to the
>>>> sorting approach only if a certain threshold of memory in a State
>>>> Backend is used. This has some problems though. We would need a way to
>>>> estimate the size of the occupied memory to tell when the threshold is
>>>> reached. That is not easily doable by default e.g. in a
>>>> MemoryStateBackend, as we do not serialize the values in the state
>>>> backend by default. We would have to add that, but this would add the
>>>> overhead of the serialization.
>>>>
>>>> This proposal aims at the cases where we do have a large state that
>> will
>>>> not fit into the memory and without the change users are forced to use
>>>> RocksDB. If the state fits in memory I agree it will be better to do
>>>> hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
>>>> think it is important to give users the choice to use one or the other
>>>> approach. We might discuss which approach should be the default for
>>>> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
>>>> user configured state backend or sorting-based with a single key at a
>>>> time backend. Moreover we could think if we should let users choose the
>>>> sort vs hash "state backend" per operator. Would that suffice?
>>>>
>>>> Ad. 2
>>>>
>>>> I still think we can just use the first X bytes of the serialized form
>>>> as the normalized key and fallback to comparing full keys on clashes.
>> It
>>>> is because we are actually not interested in a logical order, but we
>>>> care only about the "grouping" aspect of the sorting. Therefore I think
>>>> its enough to compare only parts of the full key as the normalized key.
>>>>
>>>> Thanks again for the really nice and thorough feedback!
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 08/09/2020 14:47, Kurt Young wrote:
>>>>> Regarding #1, yes the state backend is definitely hash-based
>> execution.
>>>>> However there are some differences between
>>>>> batch hash-based execution. The key difference is *random access &
>>>>> read/write mixed workload". For example, by using
>>>>> state backend in streaming execution, one have to mix the read and
>> write
>>>>> operations and all of them are actually random
>>>>> access. But in a batch hash execution, we could divide the phases into
>>>>> write and read. For example, we can build the
>>>>> hash table first, with only write operations. And once the build is
>>> done,
>>>>> we can start to read and trigger the user codes.
>>>>> Take hash aggregation which blink planner implemented as an example,
>>> during
>>>>> building phase, as long as the hash map
>>>>> could fit into memory, we will update the accumulators directly in the
>>> hash
>>>>> map. And once we are running out of memory,
>>>>> we then fall back to sort based execution. It improves the
>> performance a
>>>>> lot if the incoming data can be processed in
>>>>> memory.
>>>>>
>>>>> Regarding #2, IIUC you are actually describing a binary format of key,
>>> not
>>>>> normalized key which is used in DataSet. I will
>>>>> take String for example. If we have lots of keys with length all
>> greater
>>>>> than, let's say 20. In your proposal, you will encode
>>>>> the whole string in the prefix of your composed data ( <key> +
>>> <timestamp>
>>>>> + <record> ). And when you compare
>>>>> records, you will actually compare the *whole* key of the record. For
>>>>> normalized key, it's fixed-length in this case, IIRC it will
>>>>> take 8 bytes to represent the string. And the sorter will store the
>>>>> normalized key and offset in a dedicated array. When doing
>>>>> the sorting, it only sorts this *small* array. If the normalized keys
>>> are
>>>>> different, you could immediately tell which is greater from
>>>>> normalized keys. You only have to compare the full keys if the
>>> normalized
>>>>> keys are equal and you know in this case the normalized
>>>>> key couldn't represent the full key. The reason why Dataset is doing
>>> this
>>>>> is it's super cache efficient by sorting the *small* array.
>>>>> The idea is borrowed from this paper [1]. Let me know if I missed or
>>>>> misunderstood anything.
>>>>>
>>>>> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
>>>>> cache-sensitive parallel external sort)
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <
>> dwysakowicz@apache.org
>>>>> wrote:
>>>>>
>>>>>> Hey Kurt,
>>>>>>
>>>>>> Thank you for comments!
>>>>>>
>>>>>> Ad. 1 I might have missed something here, but as far as I see it is
>>> that
>>>>>> using the current execution stack with regular state backends
>> (RocksDB
>>>>>> in particular if we want to have spilling capabilities) is equivalent
>>> to
>>>>>> hash-based execution. I can see a different spilling state backend
>>>>>> implementation in the future, but I think it is not batch specifc. Or
>>> am
>>>>>> I missing something?
>>>>>>
>>>>>> Ad. 2 Totally agree that normalized keys are important to the
>>>>>> performance. I think though TypeComparators are not a necessity to
>> have
>>>>>> that. Actually  this proposal is heading towards only ever performing
>>>>>> "normalized keys" comparison. I have not included in the proposal the
>>>>>> binary format which we will use for sorting (partially because I
>>> forgot,
>>>>>> and partially because I thought it was too much of an implementation
>>>>>> detail). Let me include it here though, as it might clear the
>> situation
>>>>>> a bit here.
>>>>>>
>>>>>> In DataSet, at times we have KeySelectors which extract keys based on
>>>>>> field indices or names. This allows in certain situation to extract
>> the
>>>>>> key from serialized records. Compared to DataSet, in DataStream, the
>>> key
>>>>>> is always described with a black-box KeySelector, or differently
>> with a
>>>>>> function which extracts a key from a deserialized record.  In turn
>>> there
>>>>>> is no way to create a comparator that could compare records by
>>>>>> extracting the key from a serialized record (neither with, nor
>> without
>>>>>> key normalization). We suggest that the input for the sorter will be
>>>>>>
>>>>>> <key> + <timestamp> + <record>
>>>>>>
>>>>>> Without having the key prepended we would have to deserialize the
>>> record
>>>>>> for every key comparison.
>>>>>>
>>>>>> Therefore if we agree that we perform binary comparison for keys
>> (which
>>>>>> are always prepended), it is actually equivalent to a DataSet with
>>>>>> TypeComparators that support key normalization.
>>>>>>
>>>>>> Let me know if that is clear, or I have missed something here.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 08/09/2020 03:39, Kurt Young wrote:
>>>>>>> Hi Dawid, thanks for bringing this up, it's really exciting to see
>>> that
>>>>>>> batch execution is introduced in DataStream. From the flip, it seems
>>>>>>> we are sticking with sort based execution mode (at least for now),
>>> which
>>>>>>> will sort the whole input data before any *keyed* operation is
>>>>>>> executed. I have two comments here:
>>>>>>>
>>>>>>> 1. Do we want to introduce hash-based execution in the future? Sort
>>> is a
>>>>>>> safe choice but not the best in lots of cases. IIUC we only need
>>>>>>> to make sure that before the framework finishes dealing with one
>> key,
>>> the
>>>>>>> operator doesn't see any data belonging to other keys, thus
>>>>>>> hash-based execution would also do the trick. Oon tricky thing the
>>>>>>> framework might need to deal with is memory constraint and spilling
>>>>>>> in the hash map, but Flink also has some good knowledge about these
>>>>>> stuff.
>>>>>>> 2. Going back to sort-based execution and how to sort keys. From my
>>>>>>> experience, the performance of sorting would be one the most
>> important
>>>>>>> things if we want to achieve good performance of batch execution.
>> And
>>>>>>> normalized keys are actually the key of the performance of sorting.
>>>>>>> If we want to get rid of TypeComparator, I think we still need to
>>> find a
>>>>>>> way to introduce this back.
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <
>> aljoscha@apache.org>
>>>>>> wrote:
>>>>>>>> Yes, I think we can address the problem of indeterminacy in a
>>> separate
>>>>>>>> FLIP because we're already in it.
>>>>>>>>
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>>>>>>>>> @Seth That's a very good point. I agree that RocksDB has the same
>>>>>>>>> problem. I think we can use the same approach for the sorted
>>> shuffles
>>>>>>>>> then. @Aljoscha I agree we should think about making it more
>>> resilient,
>>>>>>>>> as I guess users might have problems already if they use keys with
>>>>>>>>> non-deterministic binary representation. How do you feel about
>>>>>>>>> addressing that separately purely to limit the scope of this FLIP?
>>>>>>>>>
>>>>>>>>> @Aljoscha I tend to agree with you that the best place to actually
>>>>>> place
>>>>>>>>> the sorting would be in the InputProcessor(s). If there are no
>> more
>>>>>>>>> suggestions in respect to that issue. I'll put this proposal for
>>>>>> voting.
>>>>>>>>> @all Thank you for the feedback so far. I'd like to start a voting
>>>>>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
>>>>>> comment
>>>>>>>>> before that, if you still have some outstanding ideas.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Dawid
>>>>>>>>>
>>>>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>>>>>>>>>> Seth is right, I was just about to write that as well. There is a
>>>>>>>>>> problem, though, because some of our TypeSerializers are not
>>>>>>>>>> deterministic even though we use them as if they were. Beam
>>> excludes
>>>>>>>>>> the FloatCoder, for example, and the AvroCoder in certain cases.
>>> I'm
>>>>>>>>>> pretty sure there is also weirdness going on in our
>> KryoSerializer.
>>>>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>>>>>>>>>>> There is already an implicit assumption the TypeSerializer for
>>> keys
>>>>>> is
>>>>>>>>>>> stable/deterministic, RocksDB compares keys using their
>> serialized
>>>>>> byte
>>>>>>>>>>> strings. I think this is a non-issue (or at least it's not
>>> changing
>>>>>> the
>>>>>>>>>>> status quo).
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <twalthr@apache.org
>>>>>>>> wrote:
>>>>>>>>>>>> +1 for getting rid of the TypeComparator interface and rely on
>>> the
>>>>>>>>>>>> serialized representation for grouping.
>>>>>>>>>>>>
>>>>>>>>>>>> Adding a new type to DataStream API is quite difficult at the
>>> moment
>>>>>>>>>>>> due
>>>>>>>>>>>> to too many components that are required: TypeInformation
>> (tries
>>> to
>>>>>>>>>>>> deal
>>>>>>>>>>>> with logical fields for TypeComparators), TypeSerializer (incl.
>>> it's
>>>>>>>>>>>> snapshot interfaces), and TypeComparator (with many methods and
>>>>>>>>>>>> internals such normalized keys etc.).
>>>>>>>>>>>>
>>>>>>>>>>>> If necessary, we can add more simple comparison-related methods
>>> to
>>>>>> the
>>>>>>>>>>>> TypeSerializer interface itself in the future (like
>>>>>>>>>>>> TypeSerializer.isDeterministic).
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Timo
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>>>>>>>>>>>> Thanks for publishing the FLIP!
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
>>> dwysakowicz@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>      1. How to sort/group keys? What representation of the
>> key
>>>>>>>>>>>>>> should we
>>>>>>>>>>>>>>         use? Should we sort on the binary form or should we
>>> depend
>>>>>> on
>>>>>>>>>>>>>>         Comparators being available.
>>>>>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
>>>>>>>>>>>>> sorting/grouping
>>>>>>>>>>>> by using the binary representation. Then my opinion switched
>> and
>>> I
>>>>>>>>>>>> thought
>>>>>>>>>>>> we should use TypeComparator/Comparator because that's what the
>>>>>>>>>>>> DataSet API
>>>>>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my
>>> opinion
>>>>>>>>>>>> to use
>>>>>>>>>>>> the binary representation because it means we can eventually
>> get
>>> rid
>>>>>>>>>>>> of the
>>>>>>>>>>>> TypeComparator interface, which is a bit complicated, and
>>> because we
>>>>>>>>>>>> don't
>>>>>>>>>>>> need any good order in our sort, we only need the grouping.
>>>>>>>>>>>>> This comes with some problems, though: we need to ensure that
>>> the
>>>>>>>>>>>> TypeSerializer of the type we're sorting is
>> stable/deterministic.
>>>>>>>>>>>> Beam has
>>>>>>>>>>>> infrastructure for this in the form of
>>> Coder.verifyDeterministic()
>>>>>> [1]
>>>>>>>>>>>> which we don't have right now and should add if we go down this
>>>>>> path.
>>>>>>>>>>>>>>      2. Where in the stack should we apply the sorting (this
>>>>>> rather a
>>>>>>>>>>>>>>         discussion about internals)
>>>>>>>>>>>>> Here, I'm gravitating towards the third option of implementing
>>> it
>>>>>>>>>>>>> in the
>>>>>>>>>>>> layer of the StreamTask, which probably means implementing a
>>> custom
>>>>>>>>>>>> InputProcessor. I think it's best to do it in this layer
>> because
>>> we
>>>>>>>>>>>> would
>>>>>>>>>>>> not mix concerns of different layers as we would if we
>>> implemented
>>>>>>>>>>>> this as
>>>>>>>>>>>> a custom StreamOperator. I think this solution is also best
>> when
>>> it
>>>>>>>>>>>> comes
>>>>>>>>>>>> to multi-input operators.
>>>>>>>>>>>>>>      3. How should we deal with custom implementations of
>>>>>>>>>>>>>> StreamOperators
>>>>>>>>>>>>> I think the cleanest solution would be to go through the
>>> complete
>>>>>>>>>>>> operator lifecycle for every key, because then the watermark
>>> would
>>>>>> not
>>>>>>>>>>>> oscillate between -Inf and +Inf and we would not break the
>>>>>> semantical
>>>>>>>>>>>> guarantees that we gave to operators so far, in that the
>>> watermark
>>>>>> is
>>>>>>>>>>>> strictly monotonically increasing. However, I don't think this
>>>>>>>>>>>> solution is
>>>>>>>>>>>> feasible because it would come with too much overhead. We
>> should
>>>>>>>>>>>> solve this
>>>>>>>>>>>> problem via documentation and maybe educate people to not query
>>> the
>>>>>>>>>>>> current
>>>>>>>>>>>> watermark or not rely on the watermark being monotonically
>>>>>>>>>>>> increasing in
>>>>>>>>>>>> operator implementations to allow the framework more freedoms
>> in
>>> how
>>>>>>>>>>>> user
>>>>>>>>>>>> programs are executed.
>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>>


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Yu Li <ca...@gmail.com>.
Hi all,

Sorry for being late to the discussion, but I just noticed there are some
state backend related changes proposed in this FLIP, so would like to share
my two cents.

First of all, for the performance testing result, I'm wondering whether the
sorting cost is counted in the result for both DataSet and refined
DataStream implementations. I could think of the saving of hash computation
and final iteration to emit the word-count result (processing a key at a
time could save such iteration), but not sure whether these cost savings
are at the same grade of comparing the key bytes.

Regardless of the performance result, I agree that the capability of
removing the data after processing a key could prominently reduce the space
required by state, so introducing a new state backend for bounded stream
makes sense.

However, I'm not fully convinced to introduce a new
`InternalKeyedStateBackend` interface. I agree that we don't need to take
the overhead of `AbstractKeyedStateBackend` since we don't plan to support
checkpoint for now, but why don't we directly write a state backend
implementation for bounded stream? Or are we planning to introduce more
internal state backends in future? What's more, the current design of
`InternalKeyedStateBackend` in the FLIP document seems to be extending as
many interfaces as `AbstractedKeyedStateBackend` implements, which I guess
is a typo.

Thirdly, I suggest we name the special state backend as
`BoundedStreamInternalStateBackend`. And from our existing javadoc of
`StateBackend` it actually cannot be called a complete state backend...: "A
State Backend defines how the state of a streaming application is stored
and checkpointed".

Lastly, I didn't find a detailed design of the "SingleKeyStateBackend" in
the FLIP, and suggest we write the key design down, such as how to detect
the key switching and remove the data (especially in the non-windowing
case), etc.

Thanks.

Best Regards,
Yu


On Wed, 9 Sep 2020 at 17:18, Kurt Young <yk...@gmail.com> wrote:

> Yes, I didn't intend to block this FLIP, and some of the comments are
> actually implementation details.
> And all of them are handled internally, not visible to users, thus we can
> also change or improve them
> in the future.
>
> Best,
> Kurt
>
>
> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > I think Kurts concerns/comments are very valid and we need to implement
> > such things in the future. However, I also think that we need to get
> > started somewhere and I think what's proposed in this FLIP is a good
> > starting point that we can build on. So we should not get paralyzed by
> > thinking too far ahead into the future. Does that make sense?
> >
> > Best,
> > Aljoscha
> >
> > On 08.09.20 16:59, Dawid Wysakowicz wrote:
> > > Ad. 1
> > >
> > > Yes, you are right in principle.
> > >
> > > Let me though clarify my proposal a bit. The proposed sort-style
> > > execution aims at a generic KeyedProcessFunction were all the
> > > "aggregations" are actually performed in the user code. It tries to
> > > improve the performance by actually removing the need to use RocksDB
> > e.g.:
> > >
> > >      private static final class Summer<K>
> > >              extends KeyedProcessFunction<K, Tuple2<K, Integer>,
> > > Tuple2<K, Integer>> {
> > >
> > >          ....
> > >
> > >          @Override
> > >          public void processElement(
> > >                  Tuple2<K, Integer> value,
> > >                  Context ctx,
> > >                  Collector<Tuple2<K, Integer>> out) throws Exception {
> > >              if (!Objects.equals(timerRegistered.value(),
> Boolean.TRUE))
> > {
> > >
> > ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
> > >                  timerRegistered.update(true);
> > >              }
> > >              Integer v = counter.value();
> > >              Integer incomingValue = value.f1;
> > >              if (v != null) {
> > >                  v += incomingValue;
> > >              } else {
> > >                  v = incomingValue;
> > >              }
> > >              counter.update(v);
> > >          }
> > >
> > >          ....
> > >
> > >     }
> > >
> > > Therefore I don't think the first part of your reply with separating
> the
> > > write and read workload applies here. We do not aim to create a
> > > competing API with the Table API. We think operations such as joins or
> > > analytical aggregations should be performed in Table API.
> > >
> > > As for the second part I agree it would be nice to fall back to the
> > > sorting approach only if a certain threshold of memory in a State
> > > Backend is used. This has some problems though. We would need a way to
> > > estimate the size of the occupied memory to tell when the threshold is
> > > reached. That is not easily doable by default e.g. in a
> > > MemoryStateBackend, as we do not serialize the values in the state
> > > backend by default. We would have to add that, but this would add the
> > > overhead of the serialization.
> > >
> > > This proposal aims at the cases where we do have a large state that
> will
> > > not fit into the memory and without the change users are forced to use
> > > RocksDB. If the state fits in memory I agree it will be better to do
> > > hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> > > think it is important to give users the choice to use one or the other
> > > approach. We might discuss which approach should be the default for
> > > RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> > > user configured state backend or sorting-based with a single key at a
> > > time backend. Moreover we could think if we should let users choose the
> > > sort vs hash "state backend" per operator. Would that suffice?
> > >
> > > Ad. 2
> > >
> > > I still think we can just use the first X bytes of the serialized form
> > > as the normalized key and fallback to comparing full keys on clashes.
> It
> > > is because we are actually not interested in a logical order, but we
> > > care only about the "grouping" aspect of the sorting. Therefore I think
> > > its enough to compare only parts of the full key as the normalized key.
> > >
> > > Thanks again for the really nice and thorough feedback!
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 08/09/2020 14:47, Kurt Young wrote:
> > >> Regarding #1, yes the state backend is definitely hash-based
> execution.
> > >> However there are some differences between
> > >> batch hash-based execution. The key difference is *random access &
> > >> read/write mixed workload". For example, by using
> > >> state backend in streaming execution, one have to mix the read and
> write
> > >> operations and all of them are actually random
> > >> access. But in a batch hash execution, we could divide the phases into
> > >> write and read. For example, we can build the
> > >> hash table first, with only write operations. And once the build is
> > done,
> > >> we can start to read and trigger the user codes.
> > >> Take hash aggregation which blink planner implemented as an example,
> > during
> > >> building phase, as long as the hash map
> > >> could fit into memory, we will update the accumulators directly in the
> > hash
> > >> map. And once we are running out of memory,
> > >> we then fall back to sort based execution. It improves the
> performance a
> > >> lot if the incoming data can be processed in
> > >> memory.
> > >>
> > >> Regarding #2, IIUC you are actually describing a binary format of key,
> > not
> > >> normalized key which is used in DataSet. I will
> > >> take String for example. If we have lots of keys with length all
> greater
> > >> than, let's say 20. In your proposal, you will encode
> > >> the whole string in the prefix of your composed data ( <key> +
> > <timestamp>
> > >> + <record> ). And when you compare
> > >> records, you will actually compare the *whole* key of the record. For
> > >> normalized key, it's fixed-length in this case, IIRC it will
> > >> take 8 bytes to represent the string. And the sorter will store the
> > >> normalized key and offset in a dedicated array. When doing
> > >> the sorting, it only sorts this *small* array. If the normalized keys
> > are
> > >> different, you could immediately tell which is greater from
> > >> normalized keys. You only have to compare the full keys if the
> > normalized
> > >> keys are equal and you know in this case the normalized
> > >> key couldn't represent the full key. The reason why Dataset is doing
> > this
> > >> is it's super cache efficient by sorting the *small* array.
> > >> The idea is borrowed from this paper [1]. Let me know if I missed or
> > >> misunderstood anything.
> > >>
> > >> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
> > >> cache-sensitive parallel external sort)
> > >>
> > >> Best,
> > >> Kurt
> > >>
> > >>
> > >> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <
> dwysakowicz@apache.org
> > >
> > >> wrote:
> > >>
> > >>> Hey Kurt,
> > >>>
> > >>> Thank you for comments!
> > >>>
> > >>> Ad. 1 I might have missed something here, but as far as I see it is
> > that
> > >>> using the current execution stack with regular state backends
> (RocksDB
> > >>> in particular if we want to have spilling capabilities) is equivalent
> > to
> > >>> hash-based execution. I can see a different spilling state backend
> > >>> implementation in the future, but I think it is not batch specifc. Or
> > am
> > >>> I missing something?
> > >>>
> > >>> Ad. 2 Totally agree that normalized keys are important to the
> > >>> performance. I think though TypeComparators are not a necessity to
> have
> > >>> that. Actually  this proposal is heading towards only ever performing
> > >>> "normalized keys" comparison. I have not included in the proposal the
> > >>> binary format which we will use for sorting (partially because I
> > forgot,
> > >>> and partially because I thought it was too much of an implementation
> > >>> detail). Let me include it here though, as it might clear the
> situation
> > >>> a bit here.
> > >>>
> > >>> In DataSet, at times we have KeySelectors which extract keys based on
> > >>> field indices or names. This allows in certain situation to extract
> the
> > >>> key from serialized records. Compared to DataSet, in DataStream, the
> > key
> > >>> is always described with a black-box KeySelector, or differently
> with a
> > >>> function which extracts a key from a deserialized record.  In turn
> > there
> > >>> is no way to create a comparator that could compare records by
> > >>> extracting the key from a serialized record (neither with, nor
> without
> > >>> key normalization). We suggest that the input for the sorter will be
> > >>>
> > >>> <key> + <timestamp> + <record>
> > >>>
> > >>> Without having the key prepended we would have to deserialize the
> > record
> > >>> for every key comparison.
> > >>>
> > >>> Therefore if we agree that we perform binary comparison for keys
> (which
> > >>> are always prepended), it is actually equivalent to a DataSet with
> > >>> TypeComparators that support key normalization.
> > >>>
> > >>> Let me know if that is clear, or I have missed something here.
> > >>>
> > >>> Best,
> > >>>
> > >>> Dawid
> > >>>
> > >>> On 08/09/2020 03:39, Kurt Young wrote:
> > >>>> Hi Dawid, thanks for bringing this up, it's really exciting to see
> > that
> > >>>> batch execution is introduced in DataStream. From the flip, it seems
> > >>>> we are sticking with sort based execution mode (at least for now),
> > which
> > >>>> will sort the whole input data before any *keyed* operation is
> > >>>> executed. I have two comments here:
> > >>>>
> > >>>> 1. Do we want to introduce hash-based execution in the future? Sort
> > is a
> > >>>> safe choice but not the best in lots of cases. IIUC we only need
> > >>>> to make sure that before the framework finishes dealing with one
> key,
> > the
> > >>>> operator doesn't see any data belonging to other keys, thus
> > >>>> hash-based execution would also do the trick. Oon tricky thing the
> > >>>> framework might need to deal with is memory constraint and spilling
> > >>>> in the hash map, but Flink also has some good knowledge about these
> > >>> stuff.
> > >>>> 2. Going back to sort-based execution and how to sort keys. From my
> > >>>> experience, the performance of sorting would be one the most
> important
> > >>>> things if we want to achieve good performance of batch execution.
> And
> > >>>> normalized keys are actually the key of the performance of sorting.
> > >>>> If we want to get rid of TypeComparator, I think we still need to
> > find a
> > >>>> way to introduce this back.
> > >>>>
> > >>>> Best,
> > >>>> Kurt
> > >>>>
> > >>>>
> > >>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <
> aljoscha@apache.org>
> > >>> wrote:
> > >>>>> Yes, I think we can address the problem of indeterminacy in a
> > separate
> > >>>>> FLIP because we're already in it.
> > >>>>>
> > >>>>> Aljoscha
> > >>>>>
> > >>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
> > >>>>>> @Seth That's a very good point. I agree that RocksDB has the same
> > >>>>>> problem. I think we can use the same approach for the sorted
> > shuffles
> > >>>>>> then. @Aljoscha I agree we should think about making it more
> > resilient,
> > >>>>>> as I guess users might have problems already if they use keys with
> > >>>>>> non-deterministic binary representation. How do you feel about
> > >>>>>> addressing that separately purely to limit the scope of this FLIP?
> > >>>>>>
> > >>>>>> @Aljoscha I tend to agree with you that the best place to actually
> > >>> place
> > >>>>>> the sorting would be in the InputProcessor(s). If there are no
> more
> > >>>>>> suggestions in respect to that issue. I'll put this proposal for
> > >>> voting.
> > >>>>>> @all Thank you for the feedback so far. I'd like to start a voting
> > >>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
> > >>> comment
> > >>>>>> before that, if you still have some outstanding ideas.
> > >>>>>>
> > >>>>>> Best,
> > >>>>>>
> > >>>>>> Dawid
> > >>>>>>
> > >>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
> > >>>>>>> Seth is right, I was just about to write that as well. There is a
> > >>>>>>> problem, though, because some of our TypeSerializers are not
> > >>>>>>> deterministic even though we use them as if they were. Beam
> > excludes
> > >>>>>>> the FloatCoder, for example, and the AvroCoder in certain cases.
> > I'm
> > >>>>>>> pretty sure there is also weirdness going on in our
> KryoSerializer.
> > >>>>>>>
> > >>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
> > >>>>>>>> There is already an implicit assumption the TypeSerializer for
> > keys
> > >>> is
> > >>>>>>>> stable/deterministic, RocksDB compares keys using their
> serialized
> > >>> byte
> > >>>>>>>> strings. I think this is a non-issue (or at least it's not
> > changing
> > >>> the
> > >>>>>>>> status quo).
> > >>>>>>>>
> > >>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <twalthr@apache.org
> >
> > >>>>> wrote:
> > >>>>>>>>> +1 for getting rid of the TypeComparator interface and rely on
> > the
> > >>>>>>>>> serialized representation for grouping.
> > >>>>>>>>>
> > >>>>>>>>> Adding a new type to DataStream API is quite difficult at the
> > moment
> > >>>>>>>>> due
> > >>>>>>>>> to too many components that are required: TypeInformation
> (tries
> > to
> > >>>>>>>>> deal
> > >>>>>>>>> with logical fields for TypeComparators), TypeSerializer (incl.
> > it's
> > >>>>>>>>> snapshot interfaces), and TypeComparator (with many methods and
> > >>>>>>>>> internals such normalized keys etc.).
> > >>>>>>>>>
> > >>>>>>>>> If necessary, we can add more simple comparison-related methods
> > to
> > >>> the
> > >>>>>>>>> TypeSerializer interface itself in the future (like
> > >>>>>>>>> TypeSerializer.isDeterministic).
> > >>>>>>>>>
> > >>>>>>>>> Regards,
> > >>>>>>>>> Timo
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
> > >>>>>>>>>> Thanks for publishing the FLIP!
> > >>>>>>>>>>
> > >>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
> > dwysakowicz@apache.org>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>      1. How to sort/group keys? What representation of the
> key
> > >>>>>>>>>>> should we
> > >>>>>>>>>>>         use? Should we sort on the binary form or should we
> > depend
> > >>> on
> > >>>>>>>>>>>         Comparators being available.
> > >>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
> > >>>>>>>>>> sorting/grouping
> > >>>>>>>>> by using the binary representation. Then my opinion switched
> and
> > I
> > >>>>>>>>> thought
> > >>>>>>>>> we should use TypeComparator/Comparator because that's what the
> > >>>>>>>>> DataSet API
> > >>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my
> > opinion
> > >>>>>>>>> to use
> > >>>>>>>>> the binary representation because it means we can eventually
> get
> > rid
> > >>>>>>>>> of the
> > >>>>>>>>> TypeComparator interface, which is a bit complicated, and
> > because we
> > >>>>>>>>> don't
> > >>>>>>>>> need any good order in our sort, we only need the grouping.
> > >>>>>>>>>> This comes with some problems, though: we need to ensure that
> > the
> > >>>>>>>>> TypeSerializer of the type we're sorting is
> stable/deterministic.
> > >>>>>>>>> Beam has
> > >>>>>>>>> infrastructure for this in the form of
> > Coder.verifyDeterministic()
> > >>> [1]
> > >>>>>>>>> which we don't have right now and should add if we go down this
> > >>> path.
> > >>>>>>>>>>>      2. Where in the stack should we apply the sorting (this
> > >>> rather a
> > >>>>>>>>>>>         discussion about internals)
> > >>>>>>>>>> Here, I'm gravitating towards the third option of implementing
> > it
> > >>>>>>>>>> in the
> > >>>>>>>>> layer of the StreamTask, which probably means implementing a
> > custom
> > >>>>>>>>> InputProcessor. I think it's best to do it in this layer
> because
> > we
> > >>>>>>>>> would
> > >>>>>>>>> not mix concerns of different layers as we would if we
> > implemented
> > >>>>>>>>> this as
> > >>>>>>>>> a custom StreamOperator. I think this solution is also best
> when
> > it
> > >>>>>>>>> comes
> > >>>>>>>>> to multi-input operators.
> > >>>>>>>>>>>      3. How should we deal with custom implementations of
> > >>>>>>>>>>> StreamOperators
> > >>>>>>>>>> I think the cleanest solution would be to go through the
> > complete
> > >>>>>>>>> operator lifecycle for every key, because then the watermark
> > would
> > >>> not
> > >>>>>>>>> oscillate between -Inf and +Inf and we would not break the
> > >>> semantical
> > >>>>>>>>> guarantees that we gave to operators so far, in that the
> > watermark
> > >>> is
> > >>>>>>>>> strictly monotonically increasing. However, I don't think this
> > >>>>>>>>> solution is
> > >>>>>>>>> feasible because it would come with too much overhead. We
> should
> > >>>>>>>>> solve this
> > >>>>>>>>> problem via documentation and maybe educate people to not query
> > the
> > >>>>>>>>> current
> > >>>>>>>>> watermark or not rely on the watermark being monotonically
> > >>>>>>>>> increasing in
> > >>>>>>>>> operator implementations to allow the framework more freedoms
> in
> > how
> > >>>>>>>>> user
> > >>>>>>>>> programs are executed.
> > >>>>>>>>>> Aljoscha
> > >>>>>>>>>>
> > >>>>>>>>>> [1]
> > >>>
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> > >>>
> > >
> >
> >
>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Kurt Young <yk...@gmail.com>.
Yes, I didn't intend to block this FLIP, and some of the comments are
actually implementation details.
And all of them are handled internally, not visible to users, thus we can
also change or improve them
in the future.

Best,
Kurt


On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek <al...@apache.org> wrote:

> I think Kurts concerns/comments are very valid and we need to implement
> such things in the future. However, I also think that we need to get
> started somewhere and I think what's proposed in this FLIP is a good
> starting point that we can build on. So we should not get paralyzed by
> thinking too far ahead into the future. Does that make sense?
>
> Best,
> Aljoscha
>
> On 08.09.20 16:59, Dawid Wysakowicz wrote:
> > Ad. 1
> >
> > Yes, you are right in principle.
> >
> > Let me though clarify my proposal a bit. The proposed sort-style
> > execution aims at a generic KeyedProcessFunction were all the
> > "aggregations" are actually performed in the user code. It tries to
> > improve the performance by actually removing the need to use RocksDB
> e.g.:
> >
> >      private static final class Summer<K>
> >              extends KeyedProcessFunction<K, Tuple2<K, Integer>,
> > Tuple2<K, Integer>> {
> >
> >          ....
> >
> >          @Override
> >          public void processElement(
> >                  Tuple2<K, Integer> value,
> >                  Context ctx,
> >                  Collector<Tuple2<K, Integer>> out) throws Exception {
> >              if (!Objects.equals(timerRegistered.value(), Boolean.TRUE))
> {
> >
> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
> >                  timerRegistered.update(true);
> >              }
> >              Integer v = counter.value();
> >              Integer incomingValue = value.f1;
> >              if (v != null) {
> >                  v += incomingValue;
> >              } else {
> >                  v = incomingValue;
> >              }
> >              counter.update(v);
> >          }
> >
> >          ....
> >
> >     }
> >
> > Therefore I don't think the first part of your reply with separating the
> > write and read workload applies here. We do not aim to create a
> > competing API with the Table API. We think operations such as joins or
> > analytical aggregations should be performed in Table API.
> >
> > As for the second part I agree it would be nice to fall back to the
> > sorting approach only if a certain threshold of memory in a State
> > Backend is used. This has some problems though. We would need a way to
> > estimate the size of the occupied memory to tell when the threshold is
> > reached. That is not easily doable by default e.g. in a
> > MemoryStateBackend, as we do not serialize the values in the state
> > backend by default. We would have to add that, but this would add the
> > overhead of the serialization.
> >
> > This proposal aims at the cases where we do have a large state that will
> > not fit into the memory and without the change users are forced to use
> > RocksDB. If the state fits in memory I agree it will be better to do
> > hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> > think it is important to give users the choice to use one or the other
> > approach. We might discuss which approach should be the default for
> > RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> > user configured state backend or sorting-based with a single key at a
> > time backend. Moreover we could think if we should let users choose the
> > sort vs hash "state backend" per operator. Would that suffice?
> >
> > Ad. 2
> >
> > I still think we can just use the first X bytes of the serialized form
> > as the normalized key and fallback to comparing full keys on clashes. It
> > is because we are actually not interested in a logical order, but we
> > care only about the "grouping" aspect of the sorting. Therefore I think
> > its enough to compare only parts of the full key as the normalized key.
> >
> > Thanks again for the really nice and thorough feedback!
> >
> > Best,
> >
> > Dawid
> >
> > On 08/09/2020 14:47, Kurt Young wrote:
> >> Regarding #1, yes the state backend is definitely hash-based execution.
> >> However there are some differences between
> >> batch hash-based execution. The key difference is *random access &
> >> read/write mixed workload". For example, by using
> >> state backend in streaming execution, one have to mix the read and write
> >> operations and all of them are actually random
> >> access. But in a batch hash execution, we could divide the phases into
> >> write and read. For example, we can build the
> >> hash table first, with only write operations. And once the build is
> done,
> >> we can start to read and trigger the user codes.
> >> Take hash aggregation which blink planner implemented as an example,
> during
> >> building phase, as long as the hash map
> >> could fit into memory, we will update the accumulators directly in the
> hash
> >> map. And once we are running out of memory,
> >> we then fall back to sort based execution. It improves the performance a
> >> lot if the incoming data can be processed in
> >> memory.
> >>
> >> Regarding #2, IIUC you are actually describing a binary format of key,
> not
> >> normalized key which is used in DataSet. I will
> >> take String for example. If we have lots of keys with length all greater
> >> than, let's say 20. In your proposal, you will encode
> >> the whole string in the prefix of your composed data ( <key> +
> <timestamp>
> >> + <record> ). And when you compare
> >> records, you will actually compare the *whole* key of the record. For
> >> normalized key, it's fixed-length in this case, IIRC it will
> >> take 8 bytes to represent the string. And the sorter will store the
> >> normalized key and offset in a dedicated array. When doing
> >> the sorting, it only sorts this *small* array. If the normalized keys
> are
> >> different, you could immediately tell which is greater from
> >> normalized keys. You only have to compare the full keys if the
> normalized
> >> keys are equal and you know in this case the normalized
> >> key couldn't represent the full key. The reason why Dataset is doing
> this
> >> is it's super cache efficient by sorting the *small* array.
> >> The idea is borrowed from this paper [1]. Let me know if I missed or
> >> misunderstood anything.
> >>
> >> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
> >> cache-sensitive parallel external sort)
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <dwysakowicz@apache.org
> >
> >> wrote:
> >>
> >>> Hey Kurt,
> >>>
> >>> Thank you for comments!
> >>>
> >>> Ad. 1 I might have missed something here, but as far as I see it is
> that
> >>> using the current execution stack with regular state backends (RocksDB
> >>> in particular if we want to have spilling capabilities) is equivalent
> to
> >>> hash-based execution. I can see a different spilling state backend
> >>> implementation in the future, but I think it is not batch specifc. Or
> am
> >>> I missing something?
> >>>
> >>> Ad. 2 Totally agree that normalized keys are important to the
> >>> performance. I think though TypeComparators are not a necessity to have
> >>> that. Actually  this proposal is heading towards only ever performing
> >>> "normalized keys" comparison. I have not included in the proposal the
> >>> binary format which we will use for sorting (partially because I
> forgot,
> >>> and partially because I thought it was too much of an implementation
> >>> detail). Let me include it here though, as it might clear the situation
> >>> a bit here.
> >>>
> >>> In DataSet, at times we have KeySelectors which extract keys based on
> >>> field indices or names. This allows in certain situation to extract the
> >>> key from serialized records. Compared to DataSet, in DataStream, the
> key
> >>> is always described with a black-box KeySelector, or differently with a
> >>> function which extracts a key from a deserialized record.  In turn
> there
> >>> is no way to create a comparator that could compare records by
> >>> extracting the key from a serialized record (neither with, nor without
> >>> key normalization). We suggest that the input for the sorter will be
> >>>
> >>> <key> + <timestamp> + <record>
> >>>
> >>> Without having the key prepended we would have to deserialize the
> record
> >>> for every key comparison.
> >>>
> >>> Therefore if we agree that we perform binary comparison for keys (which
> >>> are always prepended), it is actually equivalent to a DataSet with
> >>> TypeComparators that support key normalization.
> >>>
> >>> Let me know if that is clear, or I have missed something here.
> >>>
> >>> Best,
> >>>
> >>> Dawid
> >>>
> >>> On 08/09/2020 03:39, Kurt Young wrote:
> >>>> Hi Dawid, thanks for bringing this up, it's really exciting to see
> that
> >>>> batch execution is introduced in DataStream. From the flip, it seems
> >>>> we are sticking with sort based execution mode (at least for now),
> which
> >>>> will sort the whole input data before any *keyed* operation is
> >>>> executed. I have two comments here:
> >>>>
> >>>> 1. Do we want to introduce hash-based execution in the future? Sort
> is a
> >>>> safe choice but not the best in lots of cases. IIUC we only need
> >>>> to make sure that before the framework finishes dealing with one key,
> the
> >>>> operator doesn't see any data belonging to other keys, thus
> >>>> hash-based execution would also do the trick. Oon tricky thing the
> >>>> framework might need to deal with is memory constraint and spilling
> >>>> in the hash map, but Flink also has some good knowledge about these
> >>> stuff.
> >>>> 2. Going back to sort-based execution and how to sort keys. From my
> >>>> experience, the performance of sorting would be one the most important
> >>>> things if we want to achieve good performance of batch execution. And
> >>>> normalized keys are actually the key of the performance of sorting.
> >>>> If we want to get rid of TypeComparator, I think we still need to
> find a
> >>>> way to introduce this back.
> >>>>
> >>>> Best,
> >>>> Kurt
> >>>>
> >>>>
> >>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <al...@apache.org>
> >>> wrote:
> >>>>> Yes, I think we can address the problem of indeterminacy in a
> separate
> >>>>> FLIP because we're already in it.
> >>>>>
> >>>>> Aljoscha
> >>>>>
> >>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
> >>>>>> @Seth That's a very good point. I agree that RocksDB has the same
> >>>>>> problem. I think we can use the same approach for the sorted
> shuffles
> >>>>>> then. @Aljoscha I agree we should think about making it more
> resilient,
> >>>>>> as I guess users might have problems already if they use keys with
> >>>>>> non-deterministic binary representation. How do you feel about
> >>>>>> addressing that separately purely to limit the scope of this FLIP?
> >>>>>>
> >>>>>> @Aljoscha I tend to agree with you that the best place to actually
> >>> place
> >>>>>> the sorting would be in the InputProcessor(s). If there are no more
> >>>>>> suggestions in respect to that issue. I'll put this proposal for
> >>> voting.
> >>>>>> @all Thank you for the feedback so far. I'd like to start a voting
> >>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
> >>> comment
> >>>>>> before that, if you still have some outstanding ideas.
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Dawid
> >>>>>>
> >>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
> >>>>>>> Seth is right, I was just about to write that as well. There is a
> >>>>>>> problem, though, because some of our TypeSerializers are not
> >>>>>>> deterministic even though we use them as if they were. Beam
> excludes
> >>>>>>> the FloatCoder, for example, and the AvroCoder in certain cases.
> I'm
> >>>>>>> pretty sure there is also weirdness going on in our KryoSerializer.
> >>>>>>>
> >>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
> >>>>>>>> There is already an implicit assumption the TypeSerializer for
> keys
> >>> is
> >>>>>>>> stable/deterministic, RocksDB compares keys using their serialized
> >>> byte
> >>>>>>>> strings. I think this is a non-issue (or at least it's not
> changing
> >>> the
> >>>>>>>> status quo).
> >>>>>>>>
> >>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org>
> >>>>> wrote:
> >>>>>>>>> +1 for getting rid of the TypeComparator interface and rely on
> the
> >>>>>>>>> serialized representation for grouping.
> >>>>>>>>>
> >>>>>>>>> Adding a new type to DataStream API is quite difficult at the
> moment
> >>>>>>>>> due
> >>>>>>>>> to too many components that are required: TypeInformation (tries
> to
> >>>>>>>>> deal
> >>>>>>>>> with logical fields for TypeComparators), TypeSerializer (incl.
> it's
> >>>>>>>>> snapshot interfaces), and TypeComparator (with many methods and
> >>>>>>>>> internals such normalized keys etc.).
> >>>>>>>>>
> >>>>>>>>> If necessary, we can add more simple comparison-related methods
> to
> >>> the
> >>>>>>>>> TypeSerializer interface itself in the future (like
> >>>>>>>>> TypeSerializer.isDeterministic).
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Timo
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
> >>>>>>>>>> Thanks for publishing the FLIP!
> >>>>>>>>>>
> >>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
> dwysakowicz@apache.org>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>      1. How to sort/group keys? What representation of the key
> >>>>>>>>>>> should we
> >>>>>>>>>>>         use? Should we sort on the binary form or should we
> depend
> >>> on
> >>>>>>>>>>>         Comparators being available.
> >>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
> >>>>>>>>>> sorting/grouping
> >>>>>>>>> by using the binary representation. Then my opinion switched and
> I
> >>>>>>>>> thought
> >>>>>>>>> we should use TypeComparator/Comparator because that's what the
> >>>>>>>>> DataSet API
> >>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my
> opinion
> >>>>>>>>> to use
> >>>>>>>>> the binary representation because it means we can eventually get
> rid
> >>>>>>>>> of the
> >>>>>>>>> TypeComparator interface, which is a bit complicated, and
> because we
> >>>>>>>>> don't
> >>>>>>>>> need any good order in our sort, we only need the grouping.
> >>>>>>>>>> This comes with some problems, though: we need to ensure that
> the
> >>>>>>>>> TypeSerializer of the type we're sorting is stable/deterministic.
> >>>>>>>>> Beam has
> >>>>>>>>> infrastructure for this in the form of
> Coder.verifyDeterministic()
> >>> [1]
> >>>>>>>>> which we don't have right now and should add if we go down this
> >>> path.
> >>>>>>>>>>>      2. Where in the stack should we apply the sorting (this
> >>> rather a
> >>>>>>>>>>>         discussion about internals)
> >>>>>>>>>> Here, I'm gravitating towards the third option of implementing
> it
> >>>>>>>>>> in the
> >>>>>>>>> layer of the StreamTask, which probably means implementing a
> custom
> >>>>>>>>> InputProcessor. I think it's best to do it in this layer because
> we
> >>>>>>>>> would
> >>>>>>>>> not mix concerns of different layers as we would if we
> implemented
> >>>>>>>>> this as
> >>>>>>>>> a custom StreamOperator. I think this solution is also best when
> it
> >>>>>>>>> comes
> >>>>>>>>> to multi-input operators.
> >>>>>>>>>>>      3. How should we deal with custom implementations of
> >>>>>>>>>>> StreamOperators
> >>>>>>>>>> I think the cleanest solution would be to go through the
> complete
> >>>>>>>>> operator lifecycle for every key, because then the watermark
> would
> >>> not
> >>>>>>>>> oscillate between -Inf and +Inf and we would not break the
> >>> semantical
> >>>>>>>>> guarantees that we gave to operators so far, in that the
> watermark
> >>> is
> >>>>>>>>> strictly monotonically increasing. However, I don't think this
> >>>>>>>>> solution is
> >>>>>>>>> feasible because it would come with too much overhead. We should
> >>>>>>>>> solve this
> >>>>>>>>> problem via documentation and maybe educate people to not query
> the
> >>>>>>>>> current
> >>>>>>>>> watermark or not rely on the watermark being monotonically
> >>>>>>>>> increasing in
> >>>>>>>>> operator implementations to allow the framework more freedoms in
> how
> >>>>>>>>> user
> >>>>>>>>> programs are executed.
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> >>>
> >
>
>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Aljoscha Krettek <al...@apache.org>.
I think Kurts concerns/comments are very valid and we need to implement 
such things in the future. However, I also think that we need to get 
started somewhere and I think what's proposed in this FLIP is a good 
starting point that we can build on. So we should not get paralyzed by 
thinking too far ahead into the future. Does that make sense?

Best,
Aljoscha

On 08.09.20 16:59, Dawid Wysakowicz wrote:
> Ad. 1
> 
> Yes, you are right in principle.
> 
> Let me though clarify my proposal a bit. The proposed sort-style
> execution aims at a generic KeyedProcessFunction were all the
> "aggregations" are actually performed in the user code. It tries to
> improve the performance by actually removing the need to use RocksDB e.g.:
> 
>      private static final class Summer<K>
>              extends KeyedProcessFunction<K, Tuple2<K, Integer>,
> Tuple2<K, Integer>> {
> 
>          ....
> 
>          @Override
>          public void processElement(
>                  Tuple2<K, Integer> value,
>                  Context ctx,
>                  Collector<Tuple2<K, Integer>> out) throws Exception {
>              if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) {
>                  ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>                  timerRegistered.update(true);
>              }
>              Integer v = counter.value();
>              Integer incomingValue = value.f1;
>              if (v != null) {
>                  v += incomingValue;
>              } else {
>                  v = incomingValue;
>              }
>              counter.update(v);
>          }
> 
>          ....
> 
>     }
> 
> Therefore I don't think the first part of your reply with separating the
> write and read workload applies here. We do not aim to create a
> competing API with the Table API. We think operations such as joins or
> analytical aggregations should be performed in Table API.
> 
> As for the second part I agree it would be nice to fall back to the
> sorting approach only if a certain threshold of memory in a State
> Backend is used. This has some problems though. We would need a way to
> estimate the size of the occupied memory to tell when the threshold is
> reached. That is not easily doable by default e.g. in a
> MemoryStateBackend, as we do not serialize the values in the state
> backend by default. We would have to add that, but this would add the
> overhead of the serialization.
> 
> This proposal aims at the cases where we do have a large state that will
> not fit into the memory and without the change users are forced to use
> RocksDB. If the state fits in memory I agree it will be better to do
> hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> think it is important to give users the choice to use one or the other
> approach. We might discuss which approach should be the default for
> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> user configured state backend or sorting-based with a single key at a
> time backend. Moreover we could think if we should let users choose the
> sort vs hash "state backend" per operator. Would that suffice?
> 
> Ad. 2
> 
> I still think we can just use the first X bytes of the serialized form
> as the normalized key and fallback to comparing full keys on clashes. It
> is because we are actually not interested in a logical order, but we
> care only about the "grouping" aspect of the sorting. Therefore I think
> its enough to compare only parts of the full key as the normalized key.
> 
> Thanks again for the really nice and thorough feedback!
> 
> Best,
> 
> Dawid
> 
> On 08/09/2020 14:47, Kurt Young wrote:
>> Regarding #1, yes the state backend is definitely hash-based execution.
>> However there are some differences between
>> batch hash-based execution. The key difference is *random access &
>> read/write mixed workload". For example, by using
>> state backend in streaming execution, one have to mix the read and write
>> operations and all of them are actually random
>> access. But in a batch hash execution, we could divide the phases into
>> write and read. For example, we can build the
>> hash table first, with only write operations. And once the build is done,
>> we can start to read and trigger the user codes.
>> Take hash aggregation which blink planner implemented as an example, during
>> building phase, as long as the hash map
>> could fit into memory, we will update the accumulators directly in the hash
>> map. And once we are running out of memory,
>> we then fall back to sort based execution. It improves the performance a
>> lot if the incoming data can be processed in
>> memory.
>>
>> Regarding #2, IIUC you are actually describing a binary format of key, not
>> normalized key which is used in DataSet. I will
>> take String for example. If we have lots of keys with length all greater
>> than, let's say 20. In your proposal, you will encode
>> the whole string in the prefix of your composed data ( <key> + <timestamp>
>> + <record> ). And when you compare
>> records, you will actually compare the *whole* key of the record. For
>> normalized key, it's fixed-length in this case, IIRC it will
>> take 8 bytes to represent the string. And the sorter will store the
>> normalized key and offset in a dedicated array. When doing
>> the sorting, it only sorts this *small* array. If the normalized keys are
>> different, you could immediately tell which is greater from
>> normalized keys. You only have to compare the full keys if the normalized
>> keys are equal and you know in this case the normalized
>> key couldn't represent the full key. The reason why Dataset is doing this
>> is it's super cache efficient by sorting the *small* array.
>> The idea is borrowed from this paper [1]. Let me know if I missed or
>> misunderstood anything.
>>
>> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
>> cache-sensitive parallel external sort)
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> Hey Kurt,
>>>
>>> Thank you for comments!
>>>
>>> Ad. 1 I might have missed something here, but as far as I see it is that
>>> using the current execution stack with regular state backends (RocksDB
>>> in particular if we want to have spilling capabilities) is equivalent to
>>> hash-based execution. I can see a different spilling state backend
>>> implementation in the future, but I think it is not batch specifc. Or am
>>> I missing something?
>>>
>>> Ad. 2 Totally agree that normalized keys are important to the
>>> performance. I think though TypeComparators are not a necessity to have
>>> that. Actually  this proposal is heading towards only ever performing
>>> "normalized keys" comparison. I have not included in the proposal the
>>> binary format which we will use for sorting (partially because I forgot,
>>> and partially because I thought it was too much of an implementation
>>> detail). Let me include it here though, as it might clear the situation
>>> a bit here.
>>>
>>> In DataSet, at times we have KeySelectors which extract keys based on
>>> field indices or names. This allows in certain situation to extract the
>>> key from serialized records. Compared to DataSet, in DataStream, the key
>>> is always described with a black-box KeySelector, or differently with a
>>> function which extracts a key from a deserialized record.  In turn there
>>> is no way to create a comparator that could compare records by
>>> extracting the key from a serialized record (neither with, nor without
>>> key normalization). We suggest that the input for the sorter will be
>>>
>>> <key> + <timestamp> + <record>
>>>
>>> Without having the key prepended we would have to deserialize the record
>>> for every key comparison.
>>>
>>> Therefore if we agree that we perform binary comparison for keys (which
>>> are always prepended), it is actually equivalent to a DataSet with
>>> TypeComparators that support key normalization.
>>>
>>> Let me know if that is clear, or I have missed something here.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 08/09/2020 03:39, Kurt Young wrote:
>>>> Hi Dawid, thanks for bringing this up, it's really exciting to see that
>>>> batch execution is introduced in DataStream. From the flip, it seems
>>>> we are sticking with sort based execution mode (at least for now), which
>>>> will sort the whole input data before any *keyed* operation is
>>>> executed. I have two comments here:
>>>>
>>>> 1. Do we want to introduce hash-based execution in the future? Sort is a
>>>> safe choice but not the best in lots of cases. IIUC we only need
>>>> to make sure that before the framework finishes dealing with one key, the
>>>> operator doesn't see any data belonging to other keys, thus
>>>> hash-based execution would also do the trick. Oon tricky thing the
>>>> framework might need to deal with is memory constraint and spilling
>>>> in the hash map, but Flink also has some good knowledge about these
>>> stuff.
>>>> 2. Going back to sort-based execution and how to sort keys. From my
>>>> experience, the performance of sorting would be one the most important
>>>> things if we want to achieve good performance of batch execution. And
>>>> normalized keys are actually the key of the performance of sorting.
>>>> If we want to get rid of TypeComparator, I think we still need to find a
>>>> way to introduce this back.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>>> Yes, I think we can address the problem of indeterminacy in a separate
>>>>> FLIP because we're already in it.
>>>>>
>>>>> Aljoscha
>>>>>
>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>>>>>> @Seth That's a very good point. I agree that RocksDB has the same
>>>>>> problem. I think we can use the same approach for the sorted shuffles
>>>>>> then. @Aljoscha I agree we should think about making it more resilient,
>>>>>> as I guess users might have problems already if they use keys with
>>>>>> non-deterministic binary representation. How do you feel about
>>>>>> addressing that separately purely to limit the scope of this FLIP?
>>>>>>
>>>>>> @Aljoscha I tend to agree with you that the best place to actually
>>> place
>>>>>> the sorting would be in the InputProcessor(s). If there are no more
>>>>>> suggestions in respect to that issue. I'll put this proposal for
>>> voting.
>>>>>> @all Thank you for the feedback so far. I'd like to start a voting
>>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
>>> comment
>>>>>> before that, if you still have some outstanding ideas.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>>>>>>> Seth is right, I was just about to write that as well. There is a
>>>>>>> problem, though, because some of our TypeSerializers are not
>>>>>>> deterministic even though we use them as if they were. Beam excludes
>>>>>>> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
>>>>>>> pretty sure there is also weirdness going on in our KryoSerializer.
>>>>>>>
>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>>>>>>>> There is already an implicit assumption the TypeSerializer for keys
>>> is
>>>>>>>> stable/deterministic, RocksDB compares keys using their serialized
>>> byte
>>>>>>>> strings. I think this is a non-issue (or at least it's not changing
>>> the
>>>>>>>> status quo).
>>>>>>>>
>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org>
>>>>> wrote:
>>>>>>>>> +1 for getting rid of the TypeComparator interface and rely on the
>>>>>>>>> serialized representation for grouping.
>>>>>>>>>
>>>>>>>>> Adding a new type to DataStream API is quite difficult at the moment
>>>>>>>>> due
>>>>>>>>> to too many components that are required: TypeInformation (tries to
>>>>>>>>> deal
>>>>>>>>> with logical fields for TypeComparators), TypeSerializer (incl. it's
>>>>>>>>> snapshot interfaces), and TypeComparator (with many methods and
>>>>>>>>> internals such normalized keys etc.).
>>>>>>>>>
>>>>>>>>> If necessary, we can add more simple comparison-related methods to
>>> the
>>>>>>>>> TypeSerializer interface itself in the future (like
>>>>>>>>> TypeSerializer.isDeterministic).
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>>>>>>>>> Thanks for publishing the FLIP!
>>>>>>>>>>
>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>>      1. How to sort/group keys? What representation of the key
>>>>>>>>>>> should we
>>>>>>>>>>>         use? Should we sort on the binary form or should we depend
>>> on
>>>>>>>>>>>         Comparators being available.
>>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
>>>>>>>>>> sorting/grouping
>>>>>>>>> by using the binary representation. Then my opinion switched and I
>>>>>>>>> thought
>>>>>>>>> we should use TypeComparator/Comparator because that's what the
>>>>>>>>> DataSet API
>>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my opinion
>>>>>>>>> to use
>>>>>>>>> the binary representation because it means we can eventually get rid
>>>>>>>>> of the
>>>>>>>>> TypeComparator interface, which is a bit complicated, and because we
>>>>>>>>> don't
>>>>>>>>> need any good order in our sort, we only need the grouping.
>>>>>>>>>> This comes with some problems, though: we need to ensure that the
>>>>>>>>> TypeSerializer of the type we're sorting is stable/deterministic.
>>>>>>>>> Beam has
>>>>>>>>> infrastructure for this in the form of Coder.verifyDeterministic()
>>> [1]
>>>>>>>>> which we don't have right now and should add if we go down this
>>> path.
>>>>>>>>>>>      2. Where in the stack should we apply the sorting (this
>>> rather a
>>>>>>>>>>>         discussion about internals)
>>>>>>>>>> Here, I'm gravitating towards the third option of implementing it
>>>>>>>>>> in the
>>>>>>>>> layer of the StreamTask, which probably means implementing a custom
>>>>>>>>> InputProcessor. I think it's best to do it in this layer because we
>>>>>>>>> would
>>>>>>>>> not mix concerns of different layers as we would if we implemented
>>>>>>>>> this as
>>>>>>>>> a custom StreamOperator. I think this solution is also best when it
>>>>>>>>> comes
>>>>>>>>> to multi-input operators.
>>>>>>>>>>>      3. How should we deal with custom implementations of
>>>>>>>>>>> StreamOperators
>>>>>>>>>> I think the cleanest solution would be to go through the complete
>>>>>>>>> operator lifecycle for every key, because then the watermark would
>>> not
>>>>>>>>> oscillate between -Inf and +Inf and we would not break the
>>> semantical
>>>>>>>>> guarantees that we gave to operators so far, in that the watermark
>>> is
>>>>>>>>> strictly monotonically increasing. However, I don't think this
>>>>>>>>> solution is
>>>>>>>>> feasible because it would come with too much overhead. We should
>>>>>>>>> solve this
>>>>>>>>> problem via documentation and maybe educate people to not query the
>>>>>>>>> current
>>>>>>>>> watermark or not rely on the watermark being monotonically
>>>>>>>>> increasing in
>>>>>>>>> operator implementations to allow the framework more freedoms in how
>>>>>>>>> user
>>>>>>>>> programs are executed.
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>>
> 


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Dawid Wysakowicz <dw...@apache.org>.
Ad. 1

Yes, you are right in principle.

Let me though clarify my proposal a bit. The proposed sort-style
execution aims at a generic KeyedProcessFunction were all the
"aggregations" are actually performed in the user code. It tries to
improve the performance by actually removing the need to use RocksDB e.g.:

    private static final class Summer<K>
            extends KeyedProcessFunction<K, Tuple2<K, Integer>,
Tuple2<K, Integer>> {

        ....

        @Override
        public void processElement(
                Tuple2<K, Integer> value,
                Context ctx,
                Collector<Tuple2<K, Integer>> out) throws Exception {
            if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) {
                ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
                timerRegistered.update(true);
            }
            Integer v = counter.value();
            Integer incomingValue = value.f1;
            if (v != null) {
                v += incomingValue;
            } else {
                v = incomingValue;
            }
            counter.update(v);
        }

        ....

   }

Therefore I don't think the first part of your reply with separating the
write and read workload applies here. We do not aim to create a
competing API with the Table API. We think operations such as joins or
analytical aggregations should be performed in Table API.

As for the second part I agree it would be nice to fall back to the
sorting approach only if a certain threshold of memory in a State
Backend is used. This has some problems though. We would need a way to
estimate the size of the occupied memory to tell when the threshold is
reached. That is not easily doable by default e.g. in a
MemoryStateBackend, as we do not serialize the values in the state
backend by default. We would have to add that, but this would add the
overhead of the serialization.

This proposal aims at the cases where we do have a large state that will
not fit into the memory and without the change users are forced to use
RocksDB. If the state fits in memory I agree it will be better to do
hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
think it is important to give users the choice to use one or the other
approach. We might discuss which approach should be the default for
RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
user configured state backend or sorting-based with a single key at a
time backend. Moreover we could think if we should let users choose the
sort vs hash "state backend" per operator. Would that suffice?

Ad. 2

I still think we can just use the first X bytes of the serialized form
as the normalized key and fallback to comparing full keys on clashes. It
is because we are actually not interested in a logical order, but we
care only about the "grouping" aspect of the sorting. Therefore I think
its enough to compare only parts of the full key as the normalized key.

Thanks again for the really nice and thorough feedback!

Best,

Dawid

On 08/09/2020 14:47, Kurt Young wrote:
> Regarding #1, yes the state backend is definitely hash-based execution.
> However there are some differences between
> batch hash-based execution. The key difference is *random access &
> read/write mixed workload". For example, by using
> state backend in streaming execution, one have to mix the read and write
> operations and all of them are actually random
> access. But in a batch hash execution, we could divide the phases into
> write and read. For example, we can build the
> hash table first, with only write operations. And once the build is done,
> we can start to read and trigger the user codes.
> Take hash aggregation which blink planner implemented as an example, during
> building phase, as long as the hash map
> could fit into memory, we will update the accumulators directly in the hash
> map. And once we are running out of memory,
> we then fall back to sort based execution. It improves the performance a
> lot if the incoming data can be processed in
> memory.
>
> Regarding #2, IIUC you are actually describing a binary format of key, not
> normalized key which is used in DataSet. I will
> take String for example. If we have lots of keys with length all greater
> than, let's say 20. In your proposal, you will encode
> the whole string in the prefix of your composed data ( <key> + <timestamp>
> + <record> ). And when you compare
> records, you will actually compare the *whole* key of the record. For
> normalized key, it's fixed-length in this case, IIRC it will
> take 8 bytes to represent the string. And the sorter will store the
> normalized key and offset in a dedicated array. When doing
> the sorting, it only sorts this *small* array. If the normalized keys are
> different, you could immediately tell which is greater from
> normalized keys. You only have to compare the full keys if the normalized
> keys are equal and you know in this case the normalized
> key couldn't represent the full key. The reason why Dataset is doing this
> is it's super cache efficient by sorting the *small* array.
> The idea is borrowed from this paper [1]. Let me know if I missed or
> misunderstood anything.
>
> [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
> cache-sensitive parallel external sort)
>
> Best,
> Kurt
>
>
> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hey Kurt,
>>
>> Thank you for comments!
>>
>> Ad. 1 I might have missed something here, but as far as I see it is that
>> using the current execution stack with regular state backends (RocksDB
>> in particular if we want to have spilling capabilities) is equivalent to
>> hash-based execution. I can see a different spilling state backend
>> implementation in the future, but I think it is not batch specifc. Or am
>> I missing something?
>>
>> Ad. 2 Totally agree that normalized keys are important to the
>> performance. I think though TypeComparators are not a necessity to have
>> that. Actually  this proposal is heading towards only ever performing
>> "normalized keys" comparison. I have not included in the proposal the
>> binary format which we will use for sorting (partially because I forgot,
>> and partially because I thought it was too much of an implementation
>> detail). Let me include it here though, as it might clear the situation
>> a bit here.
>>
>> In DataSet, at times we have KeySelectors which extract keys based on
>> field indices or names. This allows in certain situation to extract the
>> key from serialized records. Compared to DataSet, in DataStream, the key
>> is always described with a black-box KeySelector, or differently with a
>> function which extracts a key from a deserialized record.  In turn there
>> is no way to create a comparator that could compare records by
>> extracting the key from a serialized record (neither with, nor without
>> key normalization). We suggest that the input for the sorter will be
>>
>> <key> + <timestamp> + <record>
>>
>> Without having the key prepended we would have to deserialize the record
>> for every key comparison.
>>
>> Therefore if we agree that we perform binary comparison for keys (which
>> are always prepended), it is actually equivalent to a DataSet with
>> TypeComparators that support key normalization.
>>
>> Let me know if that is clear, or I have missed something here.
>>
>> Best,
>>
>> Dawid
>>
>> On 08/09/2020 03:39, Kurt Young wrote:
>>> Hi Dawid, thanks for bringing this up, it's really exciting to see that
>>> batch execution is introduced in DataStream. From the flip, it seems
>>> we are sticking with sort based execution mode (at least for now), which
>>> will sort the whole input data before any *keyed* operation is
>>> executed. I have two comments here:
>>>
>>> 1. Do we want to introduce hash-based execution in the future? Sort is a
>>> safe choice but not the best in lots of cases. IIUC we only need
>>> to make sure that before the framework finishes dealing with one key, the
>>> operator doesn't see any data belonging to other keys, thus
>>> hash-based execution would also do the trick. Oon tricky thing the
>>> framework might need to deal with is memory constraint and spilling
>>> in the hash map, but Flink also has some good knowledge about these
>> stuff.
>>> 2. Going back to sort-based execution and how to sort keys. From my
>>> experience, the performance of sorting would be one the most important
>>> things if we want to achieve good performance of batch execution. And
>>> normalized keys are actually the key of the performance of sorting.
>>> If we want to get rid of TypeComparator, I think we still need to find a
>>> way to introduce this back.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>>> Yes, I think we can address the problem of indeterminacy in a separate
>>>> FLIP because we're already in it.
>>>>
>>>> Aljoscha
>>>>
>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>>>>> @Seth That's a very good point. I agree that RocksDB has the same
>>>>> problem. I think we can use the same approach for the sorted shuffles
>>>>> then. @Aljoscha I agree we should think about making it more resilient,
>>>>> as I guess users might have problems already if they use keys with
>>>>> non-deterministic binary representation. How do you feel about
>>>>> addressing that separately purely to limit the scope of this FLIP?
>>>>>
>>>>> @Aljoscha I tend to agree with you that the best place to actually
>> place
>>>>> the sorting would be in the InputProcessor(s). If there are no more
>>>>> suggestions in respect to that issue. I'll put this proposal for
>> voting.
>>>>> @all Thank you for the feedback so far. I'd like to start a voting
>>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
>> comment
>>>>> before that, if you still have some outstanding ideas.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>>>>>> Seth is right, I was just about to write that as well. There is a
>>>>>> problem, though, because some of our TypeSerializers are not
>>>>>> deterministic even though we use them as if they were. Beam excludes
>>>>>> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
>>>>>> pretty sure there is also weirdness going on in our KryoSerializer.
>>>>>>
>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>>>>>>> There is already an implicit assumption the TypeSerializer for keys
>> is
>>>>>>> stable/deterministic, RocksDB compares keys using their serialized
>> byte
>>>>>>> strings. I think this is a non-issue (or at least it's not changing
>> the
>>>>>>> status quo).
>>>>>>>
>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org>
>>>> wrote:
>>>>>>>> +1 for getting rid of the TypeComparator interface and rely on the
>>>>>>>> serialized representation for grouping.
>>>>>>>>
>>>>>>>> Adding a new type to DataStream API is quite difficult at the moment
>>>>>>>> due
>>>>>>>> to too many components that are required: TypeInformation (tries to
>>>>>>>> deal
>>>>>>>> with logical fields for TypeComparators), TypeSerializer (incl. it's
>>>>>>>> snapshot interfaces), and TypeComparator (with many methods and
>>>>>>>> internals such normalized keys etc.).
>>>>>>>>
>>>>>>>> If necessary, we can add more simple comparison-related methods to
>> the
>>>>>>>> TypeSerializer interface itself in the future (like
>>>>>>>> TypeSerializer.isDeterministic).
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>>>>>>>> Thanks for publishing the FLIP!
>>>>>>>>>
>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>>     1. How to sort/group keys? What representation of the key
>>>>>>>>>> should we
>>>>>>>>>>        use? Should we sort on the binary form or should we depend
>> on
>>>>>>>>>>        Comparators being available.
>>>>>>>>> Initially, I suggested to Dawid (in private) to do the
>>>>>>>>> sorting/grouping
>>>>>>>> by using the binary representation. Then my opinion switched and I
>>>>>>>> thought
>>>>>>>> we should use TypeComparator/Comparator because that's what the
>>>>>>>> DataSet API
>>>>>>>> uses. After talking to Stephan, I'm again encouraged in my opinion
>>>>>>>> to use
>>>>>>>> the binary representation because it means we can eventually get rid
>>>>>>>> of the
>>>>>>>> TypeComparator interface, which is a bit complicated, and because we
>>>>>>>> don't
>>>>>>>> need any good order in our sort, we only need the grouping.
>>>>>>>>> This comes with some problems, though: we need to ensure that the
>>>>>>>> TypeSerializer of the type we're sorting is stable/deterministic.
>>>>>>>> Beam has
>>>>>>>> infrastructure for this in the form of Coder.verifyDeterministic()
>> [1]
>>>>>>>> which we don't have right now and should add if we go down this
>> path.
>>>>>>>>>>     2. Where in the stack should we apply the sorting (this
>> rather a
>>>>>>>>>>        discussion about internals)
>>>>>>>>> Here, I'm gravitating towards the third option of implementing it
>>>>>>>>> in the
>>>>>>>> layer of the StreamTask, which probably means implementing a custom
>>>>>>>> InputProcessor. I think it's best to do it in this layer because we
>>>>>>>> would
>>>>>>>> not mix concerns of different layers as we would if we implemented
>>>>>>>> this as
>>>>>>>> a custom StreamOperator. I think this solution is also best when it
>>>>>>>> comes
>>>>>>>> to multi-input operators.
>>>>>>>>>>     3. How should we deal with custom implementations of
>>>>>>>>>> StreamOperators
>>>>>>>>> I think the cleanest solution would be to go through the complete
>>>>>>>> operator lifecycle for every key, because then the watermark would
>> not
>>>>>>>> oscillate between -Inf and +Inf and we would not break the
>> semantical
>>>>>>>> guarantees that we gave to operators so far, in that the watermark
>> is
>>>>>>>> strictly monotonically increasing. However, I don't think this
>>>>>>>> solution is
>>>>>>>> feasible because it would come with too much overhead. We should
>>>>>>>> solve this
>>>>>>>> problem via documentation and maybe educate people to not query the
>>>>>>>> current
>>>>>>>> watermark or not rely on the watermark being monotonically
>>>>>>>> increasing in
>>>>>>>> operator implementations to allow the framework more freedoms in how
>>>>>>>> user
>>>>>>>> programs are executed.
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Kurt Young <yk...@gmail.com>.
Regarding #1, yes the state backend is definitely hash-based execution.
However there are some differences between
batch hash-based execution. The key difference is *random access &
read/write mixed workload". For example, by using
state backend in streaming execution, one have to mix the read and write
operations and all of them are actually random
access. But in a batch hash execution, we could divide the phases into
write and read. For example, we can build the
hash table first, with only write operations. And once the build is done,
we can start to read and trigger the user codes.
Take hash aggregation which blink planner implemented as an example, during
building phase, as long as the hash map
could fit into memory, we will update the accumulators directly in the hash
map. And once we are running out of memory,
we then fall back to sort based execution. It improves the performance a
lot if the incoming data can be processed in
memory.

Regarding #2, IIUC you are actually describing a binary format of key, not
normalized key which is used in DataSet. I will
take String for example. If we have lots of keys with length all greater
than, let's say 20. In your proposal, you will encode
the whole string in the prefix of your composed data ( <key> + <timestamp>
+ <record> ). And when you compare
records, you will actually compare the *whole* key of the record. For
normalized key, it's fixed-length in this case, IIRC it will
take 8 bytes to represent the string. And the sorter will store the
normalized key and offset in a dedicated array. When doing
the sorting, it only sorts this *small* array. If the normalized keys are
different, you could immediately tell which is greater from
normalized keys. You only have to compare the full keys if the normalized
keys are equal and you know in this case the normalized
key couldn't represent the full key. The reason why Dataset is doing this
is it's super cache efficient by sorting the *small* array.
The idea is borrowed from this paper [1]. Let me know if I missed or
misunderstood anything.

[1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
cache-sensitive parallel external sort)

Best,
Kurt


On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hey Kurt,
>
> Thank you for comments!
>
> Ad. 1 I might have missed something here, but as far as I see it is that
> using the current execution stack with regular state backends (RocksDB
> in particular if we want to have spilling capabilities) is equivalent to
> hash-based execution. I can see a different spilling state backend
> implementation in the future, but I think it is not batch specifc. Or am
> I missing something?
>
> Ad. 2 Totally agree that normalized keys are important to the
> performance. I think though TypeComparators are not a necessity to have
> that. Actually  this proposal is heading towards only ever performing
> "normalized keys" comparison. I have not included in the proposal the
> binary format which we will use for sorting (partially because I forgot,
> and partially because I thought it was too much of an implementation
> detail). Let me include it here though, as it might clear the situation
> a bit here.
>
> In DataSet, at times we have KeySelectors which extract keys based on
> field indices or names. This allows in certain situation to extract the
> key from serialized records. Compared to DataSet, in DataStream, the key
> is always described with a black-box KeySelector, or differently with a
> function which extracts a key from a deserialized record.  In turn there
> is no way to create a comparator that could compare records by
> extracting the key from a serialized record (neither with, nor without
> key normalization). We suggest that the input for the sorter will be
>
> <key> + <timestamp> + <record>
>
> Without having the key prepended we would have to deserialize the record
> for every key comparison.
>
> Therefore if we agree that we perform binary comparison for keys (which
> are always prepended), it is actually equivalent to a DataSet with
> TypeComparators that support key normalization.
>
> Let me know if that is clear, or I have missed something here.
>
> Best,
>
> Dawid
>
> On 08/09/2020 03:39, Kurt Young wrote:
> > Hi Dawid, thanks for bringing this up, it's really exciting to see that
> > batch execution is introduced in DataStream. From the flip, it seems
> > we are sticking with sort based execution mode (at least for now), which
> > will sort the whole input data before any *keyed* operation is
> > executed. I have two comments here:
> >
> > 1. Do we want to introduce hash-based execution in the future? Sort is a
> > safe choice but not the best in lots of cases. IIUC we only need
> > to make sure that before the framework finishes dealing with one key, the
> > operator doesn't see any data belonging to other keys, thus
> > hash-based execution would also do the trick. Oon tricky thing the
> > framework might need to deal with is memory constraint and spilling
> > in the hash map, but Flink also has some good knowledge about these
> stuff.
> >
> > 2. Going back to sort-based execution and how to sort keys. From my
> > experience, the performance of sorting would be one the most important
> > things if we want to achieve good performance of batch execution. And
> > normalized keys are actually the key of the performance of sorting.
> > If we want to get rid of TypeComparator, I think we still need to find a
> > way to introduce this back.
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <al...@apache.org>
> wrote:
> >
> >> Yes, I think we can address the problem of indeterminacy in a separate
> >> FLIP because we're already in it.
> >>
> >> Aljoscha
> >>
> >> On 07.09.20 17:00, Dawid Wysakowicz wrote:
> >>> @Seth That's a very good point. I agree that RocksDB has the same
> >>> problem. I think we can use the same approach for the sorted shuffles
> >>> then. @Aljoscha I agree we should think about making it more resilient,
> >>> as I guess users might have problems already if they use keys with
> >>> non-deterministic binary representation. How do you feel about
> >>> addressing that separately purely to limit the scope of this FLIP?
> >>>
> >>> @Aljoscha I tend to agree with you that the best place to actually
> place
> >>> the sorting would be in the InputProcessor(s). If there are no more
> >>> suggestions in respect to that issue. I'll put this proposal for
> voting.
> >>>
> >>> @all Thank you for the feedback so far. I'd like to start a voting
> >>> thread on the proposal tomorrow. Therefore I'd appreciate if you
> comment
> >>> before that, if you still have some outstanding ideas.
> >>>
> >>> Best,
> >>>
> >>> Dawid
> >>>
> >>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
> >>>> Seth is right, I was just about to write that as well. There is a
> >>>> problem, though, because some of our TypeSerializers are not
> >>>> deterministic even though we use them as if they were. Beam excludes
> >>>> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
> >>>> pretty sure there is also weirdness going on in our KryoSerializer.
> >>>>
> >>>> On 04.09.20 14:59, Seth Wiesman wrote:
> >>>>> There is already an implicit assumption the TypeSerializer for keys
> is
> >>>>> stable/deterministic, RocksDB compares keys using their serialized
> byte
> >>>>> strings. I think this is a non-issue (or at least it's not changing
> the
> >>>>> status quo).
> >>>>>
> >>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org>
> >> wrote:
> >>>>>> +1 for getting rid of the TypeComparator interface and rely on the
> >>>>>> serialized representation for grouping.
> >>>>>>
> >>>>>> Adding a new type to DataStream API is quite difficult at the moment
> >>>>>> due
> >>>>>> to too many components that are required: TypeInformation (tries to
> >>>>>> deal
> >>>>>> with logical fields for TypeComparators), TypeSerializer (incl. it's
> >>>>>> snapshot interfaces), and TypeComparator (with many methods and
> >>>>>> internals such normalized keys etc.).
> >>>>>>
> >>>>>> If necessary, we can add more simple comparison-related methods to
> the
> >>>>>> TypeSerializer interface itself in the future (like
> >>>>>> TypeSerializer.isDeterministic).
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
> >>>>>>> Thanks for publishing the FLIP!
> >>>>>>>
> >>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org>
> >>>>>>> wrote:
> >>>>>>>>     1. How to sort/group keys? What representation of the key
> >>>>>>>> should we
> >>>>>>>>        use? Should we sort on the binary form or should we depend
> on
> >>>>>>>>        Comparators being available.
> >>>>>>> Initially, I suggested to Dawid (in private) to do the
> >>>>>>> sorting/grouping
> >>>>>> by using the binary representation. Then my opinion switched and I
> >>>>>> thought
> >>>>>> we should use TypeComparator/Comparator because that's what the
> >>>>>> DataSet API
> >>>>>> uses. After talking to Stephan, I'm again encouraged in my opinion
> >>>>>> to use
> >>>>>> the binary representation because it means we can eventually get rid
> >>>>>> of the
> >>>>>> TypeComparator interface, which is a bit complicated, and because we
> >>>>>> don't
> >>>>>> need any good order in our sort, we only need the grouping.
> >>>>>>> This comes with some problems, though: we need to ensure that the
> >>>>>> TypeSerializer of the type we're sorting is stable/deterministic.
> >>>>>> Beam has
> >>>>>> infrastructure for this in the form of Coder.verifyDeterministic()
> [1]
> >>>>>> which we don't have right now and should add if we go down this
> path.
> >>>>>>>>     2. Where in the stack should we apply the sorting (this
> rather a
> >>>>>>>>        discussion about internals)
> >>>>>>> Here, I'm gravitating towards the third option of implementing it
> >>>>>>> in the
> >>>>>> layer of the StreamTask, which probably means implementing a custom
> >>>>>> InputProcessor. I think it's best to do it in this layer because we
> >>>>>> would
> >>>>>> not mix concerns of different layers as we would if we implemented
> >>>>>> this as
> >>>>>> a custom StreamOperator. I think this solution is also best when it
> >>>>>> comes
> >>>>>> to multi-input operators.
> >>>>>>>>     3. How should we deal with custom implementations of
> >>>>>>>> StreamOperators
> >>>>>>> I think the cleanest solution would be to go through the complete
> >>>>>> operator lifecycle for every key, because then the watermark would
> not
> >>>>>> oscillate between -Inf and +Inf and we would not break the
> semantical
> >>>>>> guarantees that we gave to operators so far, in that the watermark
> is
> >>>>>> strictly monotonically increasing. However, I don't think this
> >>>>>> solution is
> >>>>>> feasible because it would come with too much overhead. We should
> >>>>>> solve this
> >>>>>> problem via documentation and maybe educate people to not query the
> >>>>>> current
> >>>>>> watermark or not rely on the watermark being monotonically
> >>>>>> increasing in
> >>>>>> operator implementations to allow the framework more freedoms in how
> >>>>>> user
> >>>>>> programs are executed.
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>> [1]
> >>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> >>>>>>
> >>
>
>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Kurt,

Thank you for comments!

Ad. 1 I might have missed something here, but as far as I see it is that
using the current execution stack with regular state backends (RocksDB
in particular if we want to have spilling capabilities) is equivalent to
hash-based execution. I can see a different spilling state backend
implementation in the future, but I think it is not batch specifc. Or am
I missing something?

Ad. 2 Totally agree that normalized keys are important to the
performance. I think though TypeComparators are not a necessity to have
that. Actually  this proposal is heading towards only ever performing
"normalized keys" comparison. I have not included in the proposal the
binary format which we will use for sorting (partially because I forgot,
and partially because I thought it was too much of an implementation
detail). Let me include it here though, as it might clear the situation
a bit here.

In DataSet, at times we have KeySelectors which extract keys based on
field indices or names. This allows in certain situation to extract the
key from serialized records. Compared to DataSet, in DataStream, the key
is always described with a black-box KeySelector, or differently with a
function which extracts a key from a deserialized record.  In turn there
is no way to create a comparator that could compare records by
extracting the key from a serialized record (neither with, nor without
key normalization). We suggest that the input for the sorter will be

<key> + <timestamp> + <record>

Without having the key prepended we would have to deserialize the record
for every key comparison.

Therefore if we agree that we perform binary comparison for keys (which
are always prepended), it is actually equivalent to a DataSet with
TypeComparators that support key normalization.

Let me know if that is clear, or I have missed something here.

Best,

Dawid

On 08/09/2020 03:39, Kurt Young wrote:
> Hi Dawid, thanks for bringing this up, it's really exciting to see that
> batch execution is introduced in DataStream. From the flip, it seems
> we are sticking with sort based execution mode (at least for now), which
> will sort the whole input data before any *keyed* operation is
> executed. I have two comments here:
>
> 1. Do we want to introduce hash-based execution in the future? Sort is a
> safe choice but not the best in lots of cases. IIUC we only need
> to make sure that before the framework finishes dealing with one key, the
> operator doesn't see any data belonging to other keys, thus
> hash-based execution would also do the trick. Oon tricky thing the
> framework might need to deal with is memory constraint and spilling
> in the hash map, but Flink also has some good knowledge about these stuff.
>
> 2. Going back to sort-based execution and how to sort keys. From my
> experience, the performance of sorting would be one the most important
> things if we want to achieve good performance of batch execution. And
> normalized keys are actually the key of the performance of sorting.
> If we want to get rid of TypeComparator, I think we still need to find a
> way to introduce this back.
>
> Best,
> Kurt
>
>
> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <al...@apache.org> wrote:
>
>> Yes, I think we can address the problem of indeterminacy in a separate
>> FLIP because we're already in it.
>>
>> Aljoscha
>>
>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>>> @Seth That's a very good point. I agree that RocksDB has the same
>>> problem. I think we can use the same approach for the sorted shuffles
>>> then. @Aljoscha I agree we should think about making it more resilient,
>>> as I guess users might have problems already if they use keys with
>>> non-deterministic binary representation. How do you feel about
>>> addressing that separately purely to limit the scope of this FLIP?
>>>
>>> @Aljoscha I tend to agree with you that the best place to actually place
>>> the sorting would be in the InputProcessor(s). If there are no more
>>> suggestions in respect to that issue. I'll put this proposal for voting.
>>>
>>> @all Thank you for the feedback so far. I'd like to start a voting
>>> thread on the proposal tomorrow. Therefore I'd appreciate if you comment
>>> before that, if you still have some outstanding ideas.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>>>> Seth is right, I was just about to write that as well. There is a
>>>> problem, though, because some of our TypeSerializers are not
>>>> deterministic even though we use them as if they were. Beam excludes
>>>> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
>>>> pretty sure there is also weirdness going on in our KryoSerializer.
>>>>
>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>>>>> There is already an implicit assumption the TypeSerializer for keys is
>>>>> stable/deterministic, RocksDB compares keys using their serialized byte
>>>>> strings. I think this is a non-issue (or at least it's not changing the
>>>>> status quo).
>>>>>
>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org>
>> wrote:
>>>>>> +1 for getting rid of the TypeComparator interface and rely on the
>>>>>> serialized representation for grouping.
>>>>>>
>>>>>> Adding a new type to DataStream API is quite difficult at the moment
>>>>>> due
>>>>>> to too many components that are required: TypeInformation (tries to
>>>>>> deal
>>>>>> with logical fields for TypeComparators), TypeSerializer (incl. it's
>>>>>> snapshot interfaces), and TypeComparator (with many methods and
>>>>>> internals such normalized keys etc.).
>>>>>>
>>>>>> If necessary, we can add more simple comparison-related methods to the
>>>>>> TypeSerializer interface itself in the future (like
>>>>>> TypeSerializer.isDeterministic).
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>>>>>> Thanks for publishing the FLIP!
>>>>>>>
>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org>
>>>>>>> wrote:
>>>>>>>>     1. How to sort/group keys? What representation of the key
>>>>>>>> should we
>>>>>>>>        use? Should we sort on the binary form or should we depend on
>>>>>>>>        Comparators being available.
>>>>>>> Initially, I suggested to Dawid (in private) to do the
>>>>>>> sorting/grouping
>>>>>> by using the binary representation. Then my opinion switched and I
>>>>>> thought
>>>>>> we should use TypeComparator/Comparator because that's what the
>>>>>> DataSet API
>>>>>> uses. After talking to Stephan, I'm again encouraged in my opinion
>>>>>> to use
>>>>>> the binary representation because it means we can eventually get rid
>>>>>> of the
>>>>>> TypeComparator interface, which is a bit complicated, and because we
>>>>>> don't
>>>>>> need any good order in our sort, we only need the grouping.
>>>>>>> This comes with some problems, though: we need to ensure that the
>>>>>> TypeSerializer of the type we're sorting is stable/deterministic.
>>>>>> Beam has
>>>>>> infrastructure for this in the form of Coder.verifyDeterministic() [1]
>>>>>> which we don't have right now and should add if we go down this path.
>>>>>>>>     2. Where in the stack should we apply the sorting (this rather a
>>>>>>>>        discussion about internals)
>>>>>>> Here, I'm gravitating towards the third option of implementing it
>>>>>>> in the
>>>>>> layer of the StreamTask, which probably means implementing a custom
>>>>>> InputProcessor. I think it's best to do it in this layer because we
>>>>>> would
>>>>>> not mix concerns of different layers as we would if we implemented
>>>>>> this as
>>>>>> a custom StreamOperator. I think this solution is also best when it
>>>>>> comes
>>>>>> to multi-input operators.
>>>>>>>>     3. How should we deal with custom implementations of
>>>>>>>> StreamOperators
>>>>>>> I think the cleanest solution would be to go through the complete
>>>>>> operator lifecycle for every key, because then the watermark would not
>>>>>> oscillate between -Inf and +Inf and we would not break the semantical
>>>>>> guarantees that we gave to operators so far, in that the watermark is
>>>>>> strictly monotonically increasing. However, I don't think this
>>>>>> solution is
>>>>>> feasible because it would come with too much overhead. We should
>>>>>> solve this
>>>>>> problem via documentation and maybe educate people to not query the
>>>>>> current
>>>>>> watermark or not rely on the watermark being monotonically
>>>>>> increasing in
>>>>>> operator implementations to allow the framework more freedoms in how
>>>>>> user
>>>>>> programs are executed.
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>>>>>
>>


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Kurt Young <yk...@gmail.com>.
Hi Dawid, thanks for bringing this up, it's really exciting to see that
batch execution is introduced in DataStream. From the flip, it seems
we are sticking with sort based execution mode (at least for now), which
will sort the whole input data before any *keyed* operation is
executed. I have two comments here:

1. Do we want to introduce hash-based execution in the future? Sort is a
safe choice but not the best in lots of cases. IIUC we only need
to make sure that before the framework finishes dealing with one key, the
operator doesn't see any data belonging to other keys, thus
hash-based execution would also do the trick. Oon tricky thing the
framework might need to deal with is memory constraint and spilling
in the hash map, but Flink also has some good knowledge about these stuff.

2. Going back to sort-based execution and how to sort keys. From my
experience, the performance of sorting would be one the most important
things if we want to achieve good performance of batch execution. And
normalized keys are actually the key of the performance of sorting.
If we want to get rid of TypeComparator, I think we still need to find a
way to introduce this back.

Best,
Kurt


On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <al...@apache.org> wrote:

> Yes, I think we can address the problem of indeterminacy in a separate
> FLIP because we're already in it.
>
> Aljoscha
>
> On 07.09.20 17:00, Dawid Wysakowicz wrote:
> > @Seth That's a very good point. I agree that RocksDB has the same
> > problem. I think we can use the same approach for the sorted shuffles
> > then. @Aljoscha I agree we should think about making it more resilient,
> > as I guess users might have problems already if they use keys with
> > non-deterministic binary representation. How do you feel about
> > addressing that separately purely to limit the scope of this FLIP?
> >
> > @Aljoscha I tend to agree with you that the best place to actually place
> > the sorting would be in the InputProcessor(s). If there are no more
> > suggestions in respect to that issue. I'll put this proposal for voting.
> >
> > @all Thank you for the feedback so far. I'd like to start a voting
> > thread on the proposal tomorrow. Therefore I'd appreciate if you comment
> > before that, if you still have some outstanding ideas.
> >
> > Best,
> >
> > Dawid
> >
> > On 04/09/2020 17:13, Aljoscha Krettek wrote:
> >> Seth is right, I was just about to write that as well. There is a
> >> problem, though, because some of our TypeSerializers are not
> >> deterministic even though we use them as if they were. Beam excludes
> >> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
> >> pretty sure there is also weirdness going on in our KryoSerializer.
> >>
> >> On 04.09.20 14:59, Seth Wiesman wrote:
> >>> There is already an implicit assumption the TypeSerializer for keys is
> >>> stable/deterministic, RocksDB compares keys using their serialized byte
> >>> strings. I think this is a non-issue (or at least it's not changing the
> >>> status quo).
> >>>
> >>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org>
> wrote:
> >>>
> >>>> +1 for getting rid of the TypeComparator interface and rely on the
> >>>> serialized representation for grouping.
> >>>>
> >>>> Adding a new type to DataStream API is quite difficult at the moment
> >>>> due
> >>>> to too many components that are required: TypeInformation (tries to
> >>>> deal
> >>>> with logical fields for TypeComparators), TypeSerializer (incl. it's
> >>>> snapshot interfaces), and TypeComparator (with many methods and
> >>>> internals such normalized keys etc.).
> >>>>
> >>>> If necessary, we can add more simple comparison-related methods to the
> >>>> TypeSerializer interface itself in the future (like
> >>>> TypeSerializer.isDeterministic).
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
> >>>>> Thanks for publishing the FLIP!
> >>>>>
> >>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org>
> >>>>> wrote:
> >>>>>>     1. How to sort/group keys? What representation of the key
> >>>>>> should we
> >>>>>>        use? Should we sort on the binary form or should we depend on
> >>>>>>        Comparators being available.
> >>>>>
> >>>>> Initially, I suggested to Dawid (in private) to do the
> >>>>> sorting/grouping
> >>>> by using the binary representation. Then my opinion switched and I
> >>>> thought
> >>>> we should use TypeComparator/Comparator because that's what the
> >>>> DataSet API
> >>>> uses. After talking to Stephan, I'm again encouraged in my opinion
> >>>> to use
> >>>> the binary representation because it means we can eventually get rid
> >>>> of the
> >>>> TypeComparator interface, which is a bit complicated, and because we
> >>>> don't
> >>>> need any good order in our sort, we only need the grouping.
> >>>>>
> >>>>> This comes with some problems, though: we need to ensure that the
> >>>> TypeSerializer of the type we're sorting is stable/deterministic.
> >>>> Beam has
> >>>> infrastructure for this in the form of Coder.verifyDeterministic() [1]
> >>>> which we don't have right now and should add if we go down this path.
> >>>>>
> >>>>>>     2. Where in the stack should we apply the sorting (this rather a
> >>>>>>        discussion about internals)
> >>>>>
> >>>>> Here, I'm gravitating towards the third option of implementing it
> >>>>> in the
> >>>> layer of the StreamTask, which probably means implementing a custom
> >>>> InputProcessor. I think it's best to do it in this layer because we
> >>>> would
> >>>> not mix concerns of different layers as we would if we implemented
> >>>> this as
> >>>> a custom StreamOperator. I think this solution is also best when it
> >>>> comes
> >>>> to multi-input operators.
> >>>>>
> >>>>>>     3. How should we deal with custom implementations of
> >>>>>> StreamOperators
> >>>>>
> >>>>> I think the cleanest solution would be to go through the complete
> >>>> operator lifecycle for every key, because then the watermark would not
> >>>> oscillate between -Inf and +Inf and we would not break the semantical
> >>>> guarantees that we gave to operators so far, in that the watermark is
> >>>> strictly monotonically increasing. However, I don't think this
> >>>> solution is
> >>>> feasible because it would come with too much overhead. We should
> >>>> solve this
> >>>> problem via documentation and maybe educate people to not query the
> >>>> current
> >>>> watermark or not rely on the watermark being monotonically
> >>>> increasing in
> >>>> operator implementations to allow the framework more freedoms in how
> >>>> user
> >>>> programs are executed.
> >>>>>
> >>>>> Aljoscha
> >>>>>
> >>>>> [1]
> >>>>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> >>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, I think we can address the problem of indeterminacy in a separate 
FLIP because we're already in it.

Aljoscha

On 07.09.20 17:00, Dawid Wysakowicz wrote:
> @Seth That's a very good point. I agree that RocksDB has the same
> problem. I think we can use the same approach for the sorted shuffles
> then. @Aljoscha I agree we should think about making it more resilient,
> as I guess users might have problems already if they use keys with
> non-deterministic binary representation. How do you feel about
> addressing that separately purely to limit the scope of this FLIP?
> 
> @Aljoscha I tend to agree with you that the best place to actually place
> the sorting would be in the InputProcessor(s). If there are no more
> suggestions in respect to that issue. I'll put this proposal for voting.
> 
> @all Thank you for the feedback so far. I'd like to start a voting
> thread on the proposal tomorrow. Therefore I'd appreciate if you comment
> before that, if you still have some outstanding ideas.
> 
> Best,
> 
> Dawid
> 
> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>> Seth is right, I was just about to write that as well. There is a
>> problem, though, because some of our TypeSerializers are not
>> deterministic even though we use them as if they were. Beam excludes
>> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
>> pretty sure there is also weirdness going on in our KryoSerializer.
>>
>> On 04.09.20 14:59, Seth Wiesman wrote:
>>> There is already an implicit assumption the TypeSerializer for keys is
>>> stable/deterministic, RocksDB compares keys using their serialized byte
>>> strings. I think this is a non-issue (or at least it's not changing the
>>> status quo).
>>>
>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org> wrote:
>>>
>>>> +1 for getting rid of the TypeComparator interface and rely on the
>>>> serialized representation for grouping.
>>>>
>>>> Adding a new type to DataStream API is quite difficult at the moment
>>>> due
>>>> to too many components that are required: TypeInformation (tries to
>>>> deal
>>>> with logical fields for TypeComparators), TypeSerializer (incl. it's
>>>> snapshot interfaces), and TypeComparator (with many methods and
>>>> internals such normalized keys etc.).
>>>>
>>>> If necessary, we can add more simple comparison-related methods to the
>>>> TypeSerializer interface itself in the future (like
>>>> TypeSerializer.isDeterministic).
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>>>> Thanks for publishing the FLIP!
>>>>>
>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org>
>>>>> wrote:
>>>>>>     1. How to sort/group keys? What representation of the key
>>>>>> should we
>>>>>>        use? Should we sort on the binary form or should we depend on
>>>>>>        Comparators being available.
>>>>>
>>>>> Initially, I suggested to Dawid (in private) to do the
>>>>> sorting/grouping
>>>> by using the binary representation. Then my opinion switched and I
>>>> thought
>>>> we should use TypeComparator/Comparator because that's what the
>>>> DataSet API
>>>> uses. After talking to Stephan, I'm again encouraged in my opinion
>>>> to use
>>>> the binary representation because it means we can eventually get rid
>>>> of the
>>>> TypeComparator interface, which is a bit complicated, and because we
>>>> don't
>>>> need any good order in our sort, we only need the grouping.
>>>>>
>>>>> This comes with some problems, though: we need to ensure that the
>>>> TypeSerializer of the type we're sorting is stable/deterministic.
>>>> Beam has
>>>> infrastructure for this in the form of Coder.verifyDeterministic() [1]
>>>> which we don't have right now and should add if we go down this path.
>>>>>
>>>>>>     2. Where in the stack should we apply the sorting (this rather a
>>>>>>        discussion about internals)
>>>>>
>>>>> Here, I'm gravitating towards the third option of implementing it
>>>>> in the
>>>> layer of the StreamTask, which probably means implementing a custom
>>>> InputProcessor. I think it's best to do it in this layer because we
>>>> would
>>>> not mix concerns of different layers as we would if we implemented
>>>> this as
>>>> a custom StreamOperator. I think this solution is also best when it
>>>> comes
>>>> to multi-input operators.
>>>>>
>>>>>>     3. How should we deal with custom implementations of
>>>>>> StreamOperators
>>>>>
>>>>> I think the cleanest solution would be to go through the complete
>>>> operator lifecycle for every key, because then the watermark would not
>>>> oscillate between -Inf and +Inf and we would not break the semantical
>>>> guarantees that we gave to operators so far, in that the watermark is
>>>> strictly monotonically increasing. However, I don't think this
>>>> solution is
>>>> feasible because it would come with too much overhead. We should
>>>> solve this
>>>> problem via documentation and maybe educate people to not query the
>>>> current
>>>> watermark or not rely on the watermark being monotonically
>>>> increasing in
>>>> operator implementations to allow the framework more freedoms in how
>>>> user
>>>> programs are executed.
>>>>>
>>>>> Aljoscha
>>>>>
>>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Dawid Wysakowicz <dw...@apache.org>.
@Seth That's a very good point. I agree that RocksDB has the same
problem. I think we can use the same approach for the sorted shuffles
then. @Aljoscha I agree we should think about making it more resilient,
as I guess users might have problems already if they use keys with
non-deterministic binary representation. How do you feel about
addressing that separately purely to limit the scope of this FLIP?

@Aljoscha I tend to agree with you that the best place to actually place
the sorting would be in the InputProcessor(s). If there are no more
suggestions in respect to that issue. I'll put this proposal for voting.

@all Thank you for the feedback so far. I'd like to start a voting
thread on the proposal tomorrow. Therefore I'd appreciate if you comment
before that, if you still have some outstanding ideas.

Best,

Dawid

On 04/09/2020 17:13, Aljoscha Krettek wrote:
> Seth is right, I was just about to write that as well. There is a
> problem, though, because some of our TypeSerializers are not
> deterministic even though we use them as if they were. Beam excludes
> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
> pretty sure there is also weirdness going on in our KryoSerializer.
>
> On 04.09.20 14:59, Seth Wiesman wrote:
>> There is already an implicit assumption the TypeSerializer for keys is
>> stable/deterministic, RocksDB compares keys using their serialized byte
>> strings. I think this is a non-issue (or at least it's not changing the
>> status quo).
>>
>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org> wrote:
>>
>>> +1 for getting rid of the TypeComparator interface and rely on the
>>> serialized representation for grouping.
>>>
>>> Adding a new type to DataStream API is quite difficult at the moment
>>> due
>>> to too many components that are required: TypeInformation (tries to
>>> deal
>>> with logical fields for TypeComparators), TypeSerializer (incl. it's
>>> snapshot interfaces), and TypeComparator (with many methods and
>>> internals such normalized keys etc.).
>>>
>>> If necessary, we can add more simple comparison-related methods to the
>>> TypeSerializer interface itself in the future (like
>>> TypeSerializer.isDeterministic).
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>>> Thanks for publishing the FLIP!
>>>>
>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org>
>>>> wrote:
>>>>>    1. How to sort/group keys? What representation of the key
>>>>> should we
>>>>>       use? Should we sort on the binary form or should we depend on
>>>>>       Comparators being available.
>>>>
>>>> Initially, I suggested to Dawid (in private) to do the
>>>> sorting/grouping
>>> by using the binary representation. Then my opinion switched and I
>>> thought
>>> we should use TypeComparator/Comparator because that's what the
>>> DataSet API
>>> uses. After talking to Stephan, I'm again encouraged in my opinion
>>> to use
>>> the binary representation because it means we can eventually get rid
>>> of the
>>> TypeComparator interface, which is a bit complicated, and because we
>>> don't
>>> need any good order in our sort, we only need the grouping.
>>>>
>>>> This comes with some problems, though: we need to ensure that the
>>> TypeSerializer of the type we're sorting is stable/deterministic.
>>> Beam has
>>> infrastructure for this in the form of Coder.verifyDeterministic() [1]
>>> which we don't have right now and should add if we go down this path.
>>>>
>>>>>    2. Where in the stack should we apply the sorting (this rather a
>>>>>       discussion about internals)
>>>>
>>>> Here, I'm gravitating towards the third option of implementing it
>>>> in the
>>> layer of the StreamTask, which probably means implementing a custom
>>> InputProcessor. I think it's best to do it in this layer because we
>>> would
>>> not mix concerns of different layers as we would if we implemented
>>> this as
>>> a custom StreamOperator. I think this solution is also best when it
>>> comes
>>> to multi-input operators.
>>>>
>>>>>    3. How should we deal with custom implementations of
>>>>> StreamOperators
>>>>
>>>> I think the cleanest solution would be to go through the complete
>>> operator lifecycle for every key, because then the watermark would not
>>> oscillate between -Inf and +Inf and we would not break the semantical
>>> guarantees that we gave to operators so far, in that the watermark is
>>> strictly monotonically increasing. However, I don't think this
>>> solution is
>>> feasible because it would come with too much overhead. We should
>>> solve this
>>> problem via documentation and maybe educate people to not query the
>>> current
>>> watermark or not rely on the watermark being monotonically
>>> increasing in
>>> operator implementations to allow the framework more freedoms in how
>>> user
>>> programs are executed.
>>>>
>>>> Aljoscha
>>>>
>>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>>
>>>>
>>>
>>>
>>
>


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Aljoscha Krettek <al...@apache.org>.
Seth is right, I was just about to write that as well. There is a 
problem, though, because some of our TypeSerializers are not 
deterministic even though we use them as if they were. Beam excludes the 
FloatCoder, for example, and the AvroCoder in certain cases. I'm pretty 
sure there is also weirdness going on in our KryoSerializer.

On 04.09.20 14:59, Seth Wiesman wrote:
> There is already an implicit assumption the TypeSerializer for keys is
> stable/deterministic, RocksDB compares keys using their serialized byte
> strings. I think this is a non-issue (or at least it's not changing the
> status quo).
> 
> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org> wrote:
> 
>> +1 for getting rid of the TypeComparator interface and rely on the
>> serialized representation for grouping.
>>
>> Adding a new type to DataStream API is quite difficult at the moment due
>> to too many components that are required: TypeInformation (tries to deal
>> with logical fields for TypeComparators), TypeSerializer (incl. it's
>> snapshot interfaces), and TypeComparator (with many methods and
>> internals such normalized keys etc.).
>>
>> If necessary, we can add more simple comparison-related methods to the
>> TypeSerializer interface itself in the future (like
>> TypeSerializer.isDeterministic).
>>
>> Regards,
>> Timo
>>
>>
>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>> Thanks for publishing the FLIP!
>>>
>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org> wrote:
>>>>    1. How to sort/group keys? What representation of the key should we
>>>>       use? Should we sort on the binary form or should we depend on
>>>>       Comparators being available.
>>>
>>> Initially, I suggested to Dawid (in private) to do the sorting/grouping
>> by using the binary representation. Then my opinion switched and I thought
>> we should use TypeComparator/Comparator because that's what the DataSet API
>> uses. After talking to Stephan, I'm again encouraged in my opinion to use
>> the binary representation because it means we can eventually get rid of the
>> TypeComparator interface, which is a bit complicated, and because we don't
>> need any good order in our sort, we only need the grouping.
>>>
>>> This comes with some problems, though: we need to ensure that the
>> TypeSerializer of the type we're sorting is stable/deterministic. Beam has
>> infrastructure for this in the form of Coder.verifyDeterministic() [1]
>> which we don't have right now and should add if we go down this path.
>>>
>>>>    2. Where in the stack should we apply the sorting (this rather a
>>>>       discussion about internals)
>>>
>>> Here, I'm gravitating towards the third option of implementing it in the
>> layer of the StreamTask, which probably means implementing a custom
>> InputProcessor. I think it's best to do it in this layer because we would
>> not mix concerns of different layers as we would if we implemented this as
>> a custom StreamOperator. I think this solution is also best when it comes
>> to multi-input operators.
>>>
>>>>    3. How should we deal with custom implementations of StreamOperators
>>>
>>> I think the cleanest solution would be to go through the complete
>> operator lifecycle for every key, because then the watermark would not
>> oscillate between -Inf and +Inf and we would not break the semantical
>> guarantees that we gave to operators so far, in that the watermark is
>> strictly monotonically increasing. However, I don't think this solution is
>> feasible because it would come with too much overhead. We should solve this
>> problem via documentation and maybe educate people to not query the current
>> watermark or not rely on the watermark being monotonically increasing in
>> operator implementations to allow the framework more freedoms in how user
>> programs are executed.
>>>
>>> Aljoscha
>>>
>>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Seth Wiesman <sj...@gmail.com>.
There is already an implicit assumption the TypeSerializer for keys is
stable/deterministic, RocksDB compares keys using their serialized byte
strings. I think this is a non-issue (or at least it's not changing the
status quo).

On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <tw...@apache.org> wrote:

> +1 for getting rid of the TypeComparator interface and rely on the
> serialized representation for grouping.
>
> Adding a new type to DataStream API is quite difficult at the moment due
> to too many components that are required: TypeInformation (tries to deal
> with logical fields for TypeComparators), TypeSerializer (incl. it's
> snapshot interfaces), and TypeComparator (with many methods and
> internals such normalized keys etc.).
>
> If necessary, we can add more simple comparison-related methods to the
> TypeSerializer interface itself in the future (like
> TypeSerializer.isDeterministic).
>
> Regards,
> Timo
>
>
> On 04.09.20 11:48, Aljoscha Krettek wrote:
> > Thanks for publishing the FLIP!
> >
> > On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org> wrote:
> >>   1. How to sort/group keys? What representation of the key should we
> >>      use? Should we sort on the binary form or should we depend on
> >>      Comparators being available.
> >
> > Initially, I suggested to Dawid (in private) to do the sorting/grouping
> by using the binary representation. Then my opinion switched and I thought
> we should use TypeComparator/Comparator because that's what the DataSet API
> uses. After talking to Stephan, I'm again encouraged in my opinion to use
> the binary representation because it means we can eventually get rid of the
> TypeComparator interface, which is a bit complicated, and because we don't
> need any good order in our sort, we only need the grouping.
> >
> > This comes with some problems, though: we need to ensure that the
> TypeSerializer of the type we're sorting is stable/deterministic. Beam has
> infrastructure for this in the form of Coder.verifyDeterministic() [1]
> which we don't have right now and should add if we go down this path.
> >
> >>   2. Where in the stack should we apply the sorting (this rather a
> >>      discussion about internals)
> >
> > Here, I'm gravitating towards the third option of implementing it in the
> layer of the StreamTask, which probably means implementing a custom
> InputProcessor. I think it's best to do it in this layer because we would
> not mix concerns of different layers as we would if we implemented this as
> a custom StreamOperator. I think this solution is also best when it comes
> to multi-input operators.
> >
> >>   3. How should we deal with custom implementations of StreamOperators
> >
> > I think the cleanest solution would be to go through the complete
> operator lifecycle for every key, because then the watermark would not
> oscillate between -Inf and +Inf and we would not break the semantical
> guarantees that we gave to operators so far, in that the watermark is
> strictly monotonically increasing. However, I don't think this solution is
> feasible because it would come with too much overhead. We should solve this
> problem via documentation and maybe educate people to not query the current
> watermark or not rely on the watermark being monotonically increasing in
> operator implementations to allow the framework more freedoms in how user
> programs are executed.
> >
> > Aljoscha
> >
> > [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> >
>
>

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Timo Walther <tw...@apache.org>.
+1 for getting rid of the TypeComparator interface and rely on the 
serialized representation for grouping.

Adding a new type to DataStream API is quite difficult at the moment due 
to too many components that are required: TypeInformation (tries to deal 
with logical fields for TypeComparators), TypeSerializer (incl. it's 
snapshot interfaces), and TypeComparator (with many methods and 
internals such normalized keys etc.).

If necessary, we can add more simple comparison-related methods to the 
TypeSerializer interface itself in the future (like 
TypeSerializer.isDeterministic).

Regards,
Timo


On 04.09.20 11:48, Aljoscha Krettek wrote:
> Thanks for publishing the FLIP!
> 
> On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org> wrote:
>>   1. How to sort/group keys? What representation of the key should we
>>      use? Should we sort on the binary form or should we depend on
>>      Comparators being available.
> 
> Initially, I suggested to Dawid (in private) to do the sorting/grouping by using the binary representation. Then my opinion switched and I thought we should use TypeComparator/Comparator because that's what the DataSet API uses. After talking to Stephan, I'm again encouraged in my opinion to use the binary representation because it means we can eventually get rid of the TypeComparator interface, which is a bit complicated, and because we don't need any good order in our sort, we only need the grouping.
> 
> This comes with some problems, though: we need to ensure that the TypeSerializer of the type we're sorting is stable/deterministic. Beam has infrastructure for this in the form of Coder.verifyDeterministic() [1] which we don't have right now and should add if we go down this path.
> 
>>   2. Where in the stack should we apply the sorting (this rather a
>>      discussion about internals)
> 
> Here, I'm gravitating towards the third option of implementing it in the layer of the StreamTask, which probably means implementing a custom InputProcessor. I think it's best to do it in this layer because we would not mix concerns of different layers as we would if we implemented this as a custom StreamOperator. I think this solution is also best when it comes to multi-input operators.
> 
>>   3. How should we deal with custom implementations of StreamOperators
> 
> I think the cleanest solution would be to go through the complete operator lifecycle for every key, because then the watermark would not oscillate between -Inf and +Inf and we would not break the semantical guarantees that we gave to operators so far, in that the watermark is strictly monotonically increasing. However, I don't think this solution is feasible because it would come with too much overhead. We should solve this problem via documentation and maybe educate people to not query the current watermark or not rely on the watermark being monotonically increasing in operator implementations to allow the framework more freedoms in how user programs are executed.
> 
> Aljoscha
> 
> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> 


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks for publishing the FLIP!

On 2020/09/01 06:49:06, Dawid Wysakowicz <dw...@apache.org> wrote: 
>  1. How to sort/group keys? What representation of the key should we
>     use? Should we sort on the binary form or should we depend on
>     Comparators being available.

Initially, I suggested to Dawid (in private) to do the sorting/grouping by using the binary representation. Then my opinion switched and I thought we should use TypeComparator/Comparator because that's what the DataSet API uses. After talking to Stephan, I'm again encouraged in my opinion to use the binary representation because it means we can eventually get rid of the TypeComparator interface, which is a bit complicated, and because we don't need any good order in our sort, we only need the grouping.

This comes with some problems, though: we need to ensure that the TypeSerializer of the type we're sorting is stable/deterministic. Beam has infrastructure for this in the form of Coder.verifyDeterministic() [1] which we don't have right now and should add if we go down this path.

>  2. Where in the stack should we apply the sorting (this rather a
>     discussion about internals)

Here, I'm gravitating towards the third option of implementing it in the layer of the StreamTask, which probably means implementing a custom InputProcessor. I think it's best to do it in this layer because we would not mix concerns of different layers as we would if we implemented this as a custom StreamOperator. I think this solution is also best when it comes to multi-input operators.

>  3. How should we deal with custom implementations of StreamOperators

I think the cleanest solution would be to go through the complete operator lifecycle for every key, because then the watermark would not oscillate between -Inf and +Inf and we would not break the semantical guarantees that we gave to operators so far, in that the watermark is strictly monotonically increasing. However, I don't think this solution is feasible because it would come with too much overhead. We should solve this problem via documentation and maybe educate people to not query the current watermark or not rely on the watermark being monotonically increasing in operator implementations to allow the framework more freedoms in how user programs are executed.

Aljoscha

[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184