You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kostya Kulagin <kk...@gmail.com> on 2016/04/20 23:44:24 UTC

Count windows missing last elements?

I have a pretty big but final stream and I need to be able to window it by
number of elements.
In this case from my observations flink can 'skip' the latest chunk of data
if it has lower amount of elements than window size:

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
        LongStream.range(0, 35).forEach(ctx::collect);
      }

      @Override
      public void cancel() {

      }
    });

    source.countWindowAll(10).apply(new AllWindowFunction<Long, Long,
GlobalWindow>() {
      @Override
      public void apply(GlobalWindow window, Iterable<Long> values,
Collector<Long> out) throws Exception {
        System.out.println(Joiner.on(',').join(values));
      }
    }).print();

    env.execute("yoyoyo");


Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new
window when number of elements reach a threshold OR collecting timeout
occurs?

Re: Count windows missing last elements?

Posted by Konstantin Kulagin <kk...@gmail.com>.
Thanks!

Now I can call myself a super flink developer :)

As for the issue - I am still trying to figure out ways to do that. I've
raised a question in this thread:

http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAJ746X69L%2ByARu3pq74peov1TxyfUPhtQWg3ffLJ5SQk4OmTAg%40mail.gmail.com%3E

Thanks for your help!


On Mon, Apr 25, 2016 at 9:26 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Yes, this looks correct for a Counting Trigger that flushes when the
> sources finish. Could you also solve your filtering problem with this or is
> this still an open issue?
>
> Cheers,
> Aljoscha
>
> On Sat, 23 Apr 2016 at 16:57 Konstantin Kulagin <kk...@gmail.com>
> wrote:
>
>> I finally was able to do that. Kinda ugly, but works:
>>
>> https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5
>>
>>
>>
>> On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <kk...@gmail.com>
>> wrote:
>>
>>> I was trying to implement this (force flink to handle all values from
>>> input) but had no success...
>>> Probably I am not getting smth with flink windowing mechanism
>>> I've created my 'finishing' trigger which is basically a copy of purging
>>> trigger
>>>
>>> But was not able to make it work:
>>>
>>> https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308
>>>
>>> I was never able to see numbers from 30 to 34 in result.
>>> What am I doing wrong?
>>>
>>>
>>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> People have wondered about that a few times, yes. My opinion is that a
>>>> stream is potentially infinite and processing only stops for anomalous
>>>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>>>> those cases you would not want to flush out your data but keep them and
>>>> restart from the same state when the job is restarted.
>>>>
>>>> You can implement the behavior by writing a custom Trigger that behaves
>>>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>>>> stopped processing for natural reasons.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com> wrote:
>>>>
>>>>> Thanks,
>>>>>
>>>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>>>> least when incoming stream is finished - flush remaining elements.
>>>>>
>>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljoscha@apache.org
>>>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>> yes, you can achieve this by writing a custom Trigger that can
>>>>>> trigger both on the count or after a long-enough timeout. It would be a
>>>>>> combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger)
>>>>>> so you could look to those to get started.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a pretty big but final stream and I need to be able to window
>>>>>>> it by number of elements.
>>>>>>> In this case from my observations flink can 'skip' the latest chunk
>>>>>>> of data if it has lower amount of elements than window size:
>>>>>>>
>>>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>>>       }
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void cancel() {
>>>>>>>
>>>>>>>       }
>>>>>>>     });
>>>>>>>
>>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>>>>       @Override
>>>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>>>       }
>>>>>>>     }).print();
>>>>>>>
>>>>>>>     env.execute("yoyoyo");
>>>>>>>
>>>>>>>
>>>>>>> Output:
>>>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>>>
>>>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>>>
>>>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>>>> new window when number of elements reach a threshold OR collecting timeout
>>>>>>> occurs?
>>>>>>>
>>>>>>
>>>>>
>>>
>>

Re: Count windows missing last elements?

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, this looks correct for a Counting Trigger that flushes when the
sources finish. Could you also solve your filtering problem with this or is
this still an open issue?

Cheers,
Aljoscha

On Sat, 23 Apr 2016 at 16:57 Konstantin Kulagin <kk...@gmail.com> wrote:

> I finally was able to do that. Kinda ugly, but works:
>
> https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5
>
>
>
> On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <kk...@gmail.com>
> wrote:
>
>> I was trying to implement this (force flink to handle all values from
>> input) but had no success...
>> Probably I am not getting smth with flink windowing mechanism
>> I've created my 'finishing' trigger which is basically a copy of purging
>> trigger
>>
>> But was not able to make it work:
>>
>> https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308
>>
>> I was never able to see numbers from 30 to 34 in result.
>> What am I doing wrong?
>>
>>
>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> People have wondered about that a few times, yes. My opinion is that a
>>> stream is potentially infinite and processing only stops for anomalous
>>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>>> those cases you would not want to flush out your data but keep them and
>>> restart from the same state when the job is restarted.
>>>
>>> You can implement the behavior by writing a custom Trigger that behaves
>>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>>> stopped processing for natural reasons.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com> wrote:
>>>
>>>> Thanks,
>>>>
>>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>>> least when incoming stream is finished - flush remaining elements.
>>>>
>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>>>> both on the count or after a long-enough timeout. It would be a combination
>>>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>>>> could look to those to get started.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have a pretty big but final stream and I need to be able to window
>>>>>> it by number of elements.
>>>>>> In this case from my observations flink can 'skip' the latest chunk
>>>>>> of data if it has lower amount of elements than window size:
>>>>>>
>>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>>>
>>>>>>       @Override
>>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>>       }
>>>>>>
>>>>>>       @Override
>>>>>>       public void cancel() {
>>>>>>
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>>>       @Override
>>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>>       }
>>>>>>     }).print();
>>>>>>
>>>>>>     env.execute("yoyoyo");
>>>>>>
>>>>>>
>>>>>> Output:
>>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>>
>>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>>
>>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>>> new window when number of elements reach a threshold OR collecting timeout
>>>>>> occurs?
>>>>>>
>>>>>
>>>>
>>
>

Re: Count windows missing last elements?

Posted by Konstantin Kulagin <kk...@gmail.com>.
I finally was able to do that. Kinda ugly, but works:

https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5



On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <kk...@gmail.com>
wrote:

> I was trying to implement this (force flink to handle all values from
> input) but had no success...
> Probably I am not getting smth with flink windowing mechanism
> I've created my 'finishing' trigger which is basically a copy of purging
> trigger
>
> But was not able to make it work:
>
> https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308
>
> I was never able to see numbers from 30 to 34 in result.
> What am I doing wrong?
>
>
> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> People have wondered about that a few times, yes. My opinion is that a
>> stream is potentially infinite and processing only stops for anomalous
>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>> those cases you would not want to flush out your data but keep them and
>> restart from the same state when the job is restarted.
>>
>> You can implement the behavior by writing a custom Trigger that behaves
>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>> stopped processing for natural reasons.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com> wrote:
>>
>>> Thanks,
>>>
>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>> least when incoming stream is finished - flush remaining elements.
>>>
>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>>> both on the count or after a long-enough timeout. It would be a combination
>>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>>> could look to those to get started.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com> wrote:
>>>>
>>>>> I have a pretty big but final stream and I need to be able to window
>>>>> it by number of elements.
>>>>> In this case from my observations flink can 'skip' the latest chunk of
>>>>> data if it has lower amount of elements than window size:
>>>>>
>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>>
>>>>>       @Override
>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>       }
>>>>>
>>>>>       @Override
>>>>>       public void cancel() {
>>>>>
>>>>>       }
>>>>>     });
>>>>>
>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>>       @Override
>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>       }
>>>>>     }).print();
>>>>>
>>>>>     env.execute("yoyoyo");
>>>>>
>>>>>
>>>>> Output:
>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>
>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>
>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>> new window when number of elements reach a threshold OR collecting timeout
>>>>> occurs?
>>>>>
>>>>
>>>
>

Re: Count windows missing last elements?

Posted by Konstantin Kulagin <kk...@gmail.com>.
I was trying to implement this (force flink to handle all values from
input) but had no success...
Probably I am not getting smth with flink windowing mechanism
I've created my 'finishing' trigger which is basically a copy of purging
trigger

But was not able to make it work:

https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308

I was never able to see numbers from 30 to 34 in result.
What am I doing wrong?


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> People have wondered about that a few times, yes. My opinion is that a
> stream is potentially infinite and processing only stops for anomalous
> reasons: when the job crashes, when stopping a job to later redeploy it. In
> those cases you would not want to flush out your data but keep them and
> restart from the same state when the job is restarted.
>
> You can implement the behavior by writing a custom Trigger that behaves
> like the count trigger but also fires when receiving a Long.MAX_VALUE
> watermark. A watermark of Long.MAX_VALUE signifies that a source has
> stopped processing for natural reasons.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com> wrote:
>
>> Thanks,
>>
>> I wonder wouldn't it be good to have a built-in such functionality. At
>> least when incoming stream is finished - flush remaining elements.
>>
>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>> both on the count or after a long-enough timeout. It would be a combination
>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>> could look to those to get started.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com> wrote:
>>>
>>>> I have a pretty big but final stream and I need to be able to window it
>>>> by number of elements.
>>>> In this case from my observations flink can 'skip' the latest chunk of
>>>> data if it has lower amount of elements than window size:
>>>>
>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>
>>>>       @Override
>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>       }
>>>>
>>>>       @Override
>>>>       public void cancel() {
>>>>
>>>>       }
>>>>     });
>>>>
>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>       @Override
>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>         System.out.println(Joiner.on(',').join(values));
>>>>       }
>>>>     }).print();
>>>>
>>>>     env.execute("yoyoyo");
>>>>
>>>>
>>>> Output:
>>>> 0,1,2,3,4,5,6,7,8,9
>>>> 10,11,12,13,14,15,16,17,18,19
>>>> 20,21,22,23,24,25,26,27,28,29
>>>>
>>>> I.e. elements from 10 to 35 are not being processed.
>>>>
>>>> Does it make sense to have: count OR timeout window which will evict
>>>> new window when number of elements reach a threshold OR collecting timeout
>>>> occurs?
>>>>
>>>
>>

Re: Count windows missing last elements?

Posted by Konstantin Kulagin <kk...@gmail.com>.
No problems at all, there is not much flink people and a lot of asking guys
- it should be hard to understand each person's issues :)


Yes, it is not as easy as 'contains' operator: I need to collect some
amount of tuples in order to create a in-memory lucene index. After that I
will filter entries basing on some predefined query.

So in a simplified case -
   -> for a window of tuples (preferably based on elements count)
   -> apply some operation to all elements in a window (create an index in
my case, but lets say strings concatenation would work as well, i.e any
operation that involves all window's tuples and produces some resulting
data would work)
  -> filter each of this window's elements basing on resulting data of this
all-window-elements operation
  -> emit filtered tuples

It might be a bit hard to understand. If it is - nevermind.



On Fri, Apr 22, 2016 at 9:27 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I'm afraid I don't understand your use case yet. In you example you want
> to preserve only the elements where the string value contains a "3"? This
> can be done using a filter, as in
>
> source.filter( value -> value.f1.contains("3") )
>
> This is probably too easy, though, and I'm misunderstanding the problem.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 18:26 Kostya Kulagin <kk...@gmail.com> wrote:
>
>> Thanks for reply.
>>
>> Maybe I would need some advise in this case. My situation: we have a
>> stream of data, generally speaking <Long;String> tuples where long is a
>> unique key (ie there are no tuples with the same key)
>>
>> I need to filter out all tuples that do not match certain lucene query.
>>
>> Creating lucene index on one entry is too expensive and I cannot guess
>> what load in terms of number of entries per second would be. Idea was to
>> group entries by count, create index, filter and stream remaining tuples
>> for further processing.
>>
>> As a sample application if we replace lucene indexing with something like
>> String's 'contains' method source would look like this:
>>
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
>>   @Override
>>   public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
>>     LongStream.range(0, 30).forEach(l -> {
>>       ctx.collect(Tuple2.of(l, "This is " + l));
>>
>>
>>     });
>>   }
>>
>>   @Override
>>   public void cancel() {
>>
>>   }
>> });
>>
>> And I need lets say to window tuples and preserve only those which
>> value.contains("3").
>> There are no grouping by key since basically all keys are different. I
>> might not know everything about flink yet but for this particular example -
>> does what you were saying make sense?
>>
>>
>> Thanks!
>> Kostya
>>
>>
>>
>>
>>
>>
>> On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> if you are doing the windows not for their actual semantics I would
>>> suggest not using count based windows and also not using the *All windows.
>>> The *All windows are all non-parallel, i.e. you always only get one
>>> parallel instance of your window operator even if you have a huge cluster.
>>>
>>> Also, in most cases it is better to not use a plain WindowFunction with
>>> apply because all elements have to be buffered so that they can be passed
>>> as an Iterable, Iterable<Long> in your example. If you can, I would suggest
>>> to use a ReduceFunction or FoldFunction or an apply() with an incremental
>>> aggregation function: apply(ReduceFunction, WindowFunction) or
>>> apply(FoldFunction, WindowFunction). These allow incremental aggregation of
>>> the result as elements arrive and don't require buffering of all elements
>>> until the window fires.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kk...@gmail.com> wrote:
>>>
>>>> Maybe if it is not the first time it worth considering adding this
>>>> thing as an option? ;-)
>>>>
>>>> My usecase - I have a pretty big amount of data basically for ETL. It
>>>> is finite but it is big. I see it more as a stream not as a dataset. Also I
>>>> would re-use the same code for infinite stream later...
>>>> And I do not much care about exact window size - it is just for
>>>> performance reasons I create a windows.
>>>>
>>>> Anyways - that you for the responses!
>>>>
>>>>
>>>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> People have wondered about that a few times, yes. My opinion is that a
>>>>> stream is potentially infinite and processing only stops for anomalous
>>>>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>>>>> those cases you would not want to flush out your data but keep them and
>>>>> restart from the same state when the job is restarted.
>>>>>
>>>>> You can implement the behavior by writing a custom Trigger that
>>>>> behaves like the count trigger but also fires when receiving a
>>>>> Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a
>>>>> source has stopped processing for natural reasons.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> I wonder wouldn't it be good to have a built-in such functionality.
>>>>>> At least when incoming stream is finished - flush remaining elements.
>>>>>>
>>>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <
>>>>>> aljoscha@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> yes, you can achieve this by writing a custom Trigger that can
>>>>>>> trigger both on the count or after a long-enough timeout. It would be a
>>>>>>> combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger)
>>>>>>> so you could look to those to get started.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I have a pretty big but final stream and I need to be able to
>>>>>>>> window it by number of elements.
>>>>>>>> In this case from my observations flink can 'skip' the latest chunk
>>>>>>>> of data if it has lower amount of elements than window size:
>>>>>>>>
>>>>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>>>>>
>>>>>>>>       @Override
>>>>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>>>>       }
>>>>>>>>
>>>>>>>>       @Override
>>>>>>>>       public void cancel() {
>>>>>>>>
>>>>>>>>       }
>>>>>>>>     });
>>>>>>>>
>>>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>>>>>       @Override
>>>>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>>>>       }
>>>>>>>>     }).print();
>>>>>>>>
>>>>>>>>     env.execute("yoyoyo");
>>>>>>>>
>>>>>>>>
>>>>>>>> Output:
>>>>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>>>>
>>>>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>>>>
>>>>>>>> Does it make sense to have: count OR timeout window which will
>>>>>>>> evict new window when number of elements reach a threshold OR collecting
>>>>>>>> timeout occurs?
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>

Re: Count windows missing last elements?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I'm afraid I don't understand your use case yet. In you example you want to
preserve only the elements where the string value contains a "3"? This can
be done using a filter, as in

source.filter( value -> value.f1.contains("3") )

This is probably too easy, though, and I'm misunderstanding the problem.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 18:26 Kostya Kulagin <kk...@gmail.com> wrote:

> Thanks for reply.
>
> Maybe I would need some advise in this case. My situation: we have a
> stream of data, generally speaking <Long;String> tuples where long is a
> unique key (ie there are no tuples with the same key)
>
> I need to filter out all tuples that do not match certain lucene query.
>
> Creating lucene index on one entry is too expensive and I cannot guess
> what load in terms of number of entries per second would be. Idea was to
> group entries by count, create index, filter and stream remaining tuples
> for further processing.
>
> As a sample application if we replace lucene indexing with something like
> String's 'contains' method source would look like this:
>
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
>   @Override
>   public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
>     LongStream.range(0, 30).forEach(l -> {
>       ctx.collect(Tuple2.of(l, "This is " + l));
>
>
>     });
>   }
>
>   @Override
>   public void cancel() {
>
>   }
> });
>
> And I need lets say to window tuples and preserve only those which
> value.contains("3").
> There are no grouping by key since basically all keys are different. I
> might not know everything about flink yet but for this particular example -
> does what you were saying make sense?
>
>
> Thanks!
> Kostya
>
>
>
>
>
>
> On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> if you are doing the windows not for their actual semantics I would
>> suggest not using count based windows and also not using the *All windows.
>> The *All windows are all non-parallel, i.e. you always only get one
>> parallel instance of your window operator even if you have a huge cluster.
>>
>> Also, in most cases it is better to not use a plain WindowFunction with
>> apply because all elements have to be buffered so that they can be passed
>> as an Iterable, Iterable<Long> in your example. If you can, I would suggest
>> to use a ReduceFunction or FoldFunction or an apply() with an incremental
>> aggregation function: apply(ReduceFunction, WindowFunction) or
>> apply(FoldFunction, WindowFunction). These allow incremental aggregation of
>> the result as elements arrive and don't require buffering of all elements
>> until the window fires.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kk...@gmail.com> wrote:
>>
>>> Maybe if it is not the first time it worth considering adding this thing
>>> as an option? ;-)
>>>
>>> My usecase - I have a pretty big amount of data basically for ETL. It is
>>> finite but it is big. I see it more as a stream not as a dataset. Also I
>>> would re-use the same code for infinite stream later...
>>> And I do not much care about exact window size - it is just for
>>> performance reasons I create a windows.
>>>
>>> Anyways - that you for the responses!
>>>
>>>
>>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> People have wondered about that a few times, yes. My opinion is that a
>>>> stream is potentially infinite and processing only stops for anomalous
>>>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>>>> those cases you would not want to flush out your data but keep them and
>>>> restart from the same state when the job is restarted.
>>>>
>>>> You can implement the behavior by writing a custom Trigger that behaves
>>>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>>>> stopped processing for natural reasons.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com> wrote:
>>>>
>>>>> Thanks,
>>>>>
>>>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>>>> least when incoming stream is finished - flush remaining elements.
>>>>>
>>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljoscha@apache.org
>>>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>> yes, you can achieve this by writing a custom Trigger that can
>>>>>> trigger both on the count or after a long-enough timeout. It would be a
>>>>>> combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger)
>>>>>> so you could look to those to get started.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a pretty big but final stream and I need to be able to window
>>>>>>> it by number of elements.
>>>>>>> In this case from my observations flink can 'skip' the latest chunk
>>>>>>> of data if it has lower amount of elements than window size:
>>>>>>>
>>>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>>>       }
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void cancel() {
>>>>>>>
>>>>>>>       }
>>>>>>>     });
>>>>>>>
>>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>>>>       @Override
>>>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>>>       }
>>>>>>>     }).print();
>>>>>>>
>>>>>>>     env.execute("yoyoyo");
>>>>>>>
>>>>>>>
>>>>>>> Output:
>>>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>>>
>>>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>>>
>>>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>>>> new window when number of elements reach a threshold OR collecting timeout
>>>>>>> occurs?
>>>>>>>
>>>>>>
>>>>>
>>>
>

Re: Count windows missing last elements?

Posted by Kostya Kulagin <kk...@gmail.com>.
Thanks for reply.

Maybe I would need some advise in this case. My situation: we have a stream
of data, generally speaking <Long;String> tuples where long is a unique key
(ie there are no tuples with the same key)

I need to filter out all tuples that do not match certain lucene query.

Creating lucene index on one entry is too expensive and I cannot guess what
load in terms of number of entries per second would be. Idea was to group
entries by count, create index, filter and stream remaining tuples for
further processing.

As a sample application if we replace lucene indexing with something like
String's 'contains' method source would look like this:


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new
SourceFunction<Tuple2<Long, String>>() {
  @Override
  public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
    LongStream.range(0, 30).forEach(l -> {
      ctx.collect(Tuple2.of(l, "This is " + l));
    });
  }

  @Override
  public void cancel() {

  }
});

And I need lets say to window tuples and preserve only those which
value.contains("3").
There are no grouping by key since basically all keys are different. I
might not know everything about flink yet but for this particular example -
does what you were saying make sense?


Thanks!
Kostya






On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> if you are doing the windows not for their actual semantics I would
> suggest not using count based windows and also not using the *All windows.
> The *All windows are all non-parallel, i.e. you always only get one
> parallel instance of your window operator even if you have a huge cluster.
>
> Also, in most cases it is better to not use a plain WindowFunction with
> apply because all elements have to be buffered so that they can be passed
> as an Iterable, Iterable<Long> in your example. If you can, I would suggest
> to use a ReduceFunction or FoldFunction or an apply() with an incremental
> aggregation function: apply(ReduceFunction, WindowFunction) or
> apply(FoldFunction, WindowFunction). These allow incremental aggregation of
> the result as elements arrive and don't require buffering of all elements
> until the window fires.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kk...@gmail.com> wrote:
>
>> Maybe if it is not the first time it worth considering adding this thing
>> as an option? ;-)
>>
>> My usecase - I have a pretty big amount of data basically for ETL. It is
>> finite but it is big. I see it more as a stream not as a dataset. Also I
>> would re-use the same code for infinite stream later...
>> And I do not much care about exact window size - it is just for
>> performance reasons I create a windows.
>>
>> Anyways - that you for the responses!
>>
>>
>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> People have wondered about that a few times, yes. My opinion is that a
>>> stream is potentially infinite and processing only stops for anomalous
>>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>>> those cases you would not want to flush out your data but keep them and
>>> restart from the same state when the job is restarted.
>>>
>>> You can implement the behavior by writing a custom Trigger that behaves
>>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>>> stopped processing for natural reasons.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com> wrote:
>>>
>>>> Thanks,
>>>>
>>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>>> least when incoming stream is finished - flush remaining elements.
>>>>
>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>>>> both on the count or after a long-enough timeout. It would be a combination
>>>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>>>> could look to those to get started.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have a pretty big but final stream and I need to be able to window
>>>>>> it by number of elements.
>>>>>> In this case from my observations flink can 'skip' the latest chunk
>>>>>> of data if it has lower amount of elements than window size:
>>>>>>
>>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>>>
>>>>>>       @Override
>>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>>       }
>>>>>>
>>>>>>       @Override
>>>>>>       public void cancel() {
>>>>>>
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>>>       @Override
>>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>>       }
>>>>>>     }).print();
>>>>>>
>>>>>>     env.execute("yoyoyo");
>>>>>>
>>>>>>
>>>>>> Output:
>>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>>
>>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>>
>>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>>> new window when number of elements reach a threshold OR collecting timeout
>>>>>> occurs?
>>>>>>
>>>>>
>>>>
>>

Re: Count windows missing last elements?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
if you are doing the windows not for their actual semantics I would suggest
not using count based windows and also not using the *All windows. The *All
windows are all non-parallel, i.e. you always only get one parallel
instance of your window operator even if you have a huge cluster.

Also, in most cases it is better to not use a plain WindowFunction with
apply because all elements have to be buffered so that they can be passed
as an Iterable, Iterable<Long> in your example. If you can, I would suggest
to use a ReduceFunction or FoldFunction or an apply() with an incremental
aggregation function: apply(ReduceFunction, WindowFunction) or
apply(FoldFunction, WindowFunction). These allow incremental aggregation of
the result as elements arrive and don't require buffering of all elements
until the window fires.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kk...@gmail.com> wrote:

> Maybe if it is not the first time it worth considering adding this thing
> as an option? ;-)
>
> My usecase - I have a pretty big amount of data basically for ETL. It is
> finite but it is big. I see it more as a stream not as a dataset. Also I
> would re-use the same code for infinite stream later...
> And I do not much care about exact window size - it is just for
> performance reasons I create a windows.
>
> Anyways - that you for the responses!
>
>
> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> People have wondered about that a few times, yes. My opinion is that a
>> stream is potentially infinite and processing only stops for anomalous
>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>> those cases you would not want to flush out your data but keep them and
>> restart from the same state when the job is restarted.
>>
>> You can implement the behavior by writing a custom Trigger that behaves
>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>> stopped processing for natural reasons.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com> wrote:
>>
>>> Thanks,
>>>
>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>> least when incoming stream is finished - flush remaining elements.
>>>
>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>>> both on the count or after a long-enough timeout. It would be a combination
>>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>>> could look to those to get started.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com> wrote:
>>>>
>>>>> I have a pretty big but final stream and I need to be able to window
>>>>> it by number of elements.
>>>>> In this case from my observations flink can 'skip' the latest chunk of
>>>>> data if it has lower amount of elements than window size:
>>>>>
>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>>
>>>>>       @Override
>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>       }
>>>>>
>>>>>       @Override
>>>>>       public void cancel() {
>>>>>
>>>>>       }
>>>>>     });
>>>>>
>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>>       @Override
>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>       }
>>>>>     }).print();
>>>>>
>>>>>     env.execute("yoyoyo");
>>>>>
>>>>>
>>>>> Output:
>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>
>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>
>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>> new window when number of elements reach a threshold OR collecting timeout
>>>>> occurs?
>>>>>
>>>>
>>>
>

Re: Count windows missing last elements?

Posted by Kostya Kulagin <kk...@gmail.com>.
Maybe if it is not the first time it worth considering adding this thing as
an option? ;-)

My usecase - I have a pretty big amount of data basically for ETL. It is
finite but it is big. I see it more as a stream not as a dataset. Also I
would re-use the same code for infinite stream later...
And I do not much care about exact window size - it is just for performance
reasons I create a windows.

Anyways - that you for the responses!


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> People have wondered about that a few times, yes. My opinion is that a
> stream is potentially infinite and processing only stops for anomalous
> reasons: when the job crashes, when stopping a job to later redeploy it. In
> those cases you would not want to flush out your data but keep them and
> restart from the same state when the job is restarted.
>
> You can implement the behavior by writing a custom Trigger that behaves
> like the count trigger but also fires when receiving a Long.MAX_VALUE
> watermark. A watermark of Long.MAX_VALUE signifies that a source has
> stopped processing for natural reasons.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com> wrote:
>
>> Thanks,
>>
>> I wonder wouldn't it be good to have a built-in such functionality. At
>> least when incoming stream is finished - flush remaining elements.
>>
>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>> both on the count or after a long-enough timeout. It would be a combination
>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>> could look to those to get started.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com> wrote:
>>>
>>>> I have a pretty big but final stream and I need to be able to window it
>>>> by number of elements.
>>>> In this case from my observations flink can 'skip' the latest chunk of
>>>> data if it has lower amount of elements than window size:
>>>>
>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>
>>>>       @Override
>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>       }
>>>>
>>>>       @Override
>>>>       public void cancel() {
>>>>
>>>>       }
>>>>     });
>>>>
>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>       @Override
>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>         System.out.println(Joiner.on(',').join(values));
>>>>       }
>>>>     }).print();
>>>>
>>>>     env.execute("yoyoyo");
>>>>
>>>>
>>>> Output:
>>>> 0,1,2,3,4,5,6,7,8,9
>>>> 10,11,12,13,14,15,16,17,18,19
>>>> 20,21,22,23,24,25,26,27,28,29
>>>>
>>>> I.e. elements from 10 to 35 are not being processed.
>>>>
>>>> Does it make sense to have: count OR timeout window which will evict
>>>> new window when number of elements reach a threshold OR collecting timeout
>>>> occurs?
>>>>
>>>
>>

Re: Count windows missing last elements?

Posted by Aljoscha Krettek <al...@apache.org>.
People have wondered about that a few times, yes. My opinion is that a
stream is potentially infinite and processing only stops for anomalous
reasons: when the job crashes, when stopping a job to later redeploy it. In
those cases you would not want to flush out your data but keep them and
restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves
like the count trigger but also fires when receiving a Long.MAX_VALUE
watermark. A watermark of Long.MAX_VALUE signifies that a source has
stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kk...@gmail.com> wrote:

> Thanks,
>
> I wonder wouldn't it be good to have a built-in such functionality. At
> least when incoming stream is finished - flush remaining elements.
>
> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> yes, you can achieve this by writing a custom Trigger that can trigger
>> both on the count or after a long-enough timeout. It would be a combination
>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>> could look to those to get started.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com> wrote:
>>
>>> I have a pretty big but final stream and I need to be able to window it
>>> by number of elements.
>>> In this case from my observations flink can 'skip' the latest chunk of
>>> data if it has lower amount of elements than window size:
>>>
>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>
>>>       @Override
>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>       }
>>>
>>>       @Override
>>>       public void cancel() {
>>>
>>>       }
>>>     });
>>>
>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>       @Override
>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>         System.out.println(Joiner.on(',').join(values));
>>>       }
>>>     }).print();
>>>
>>>     env.execute("yoyoyo");
>>>
>>>
>>> Output:
>>> 0,1,2,3,4,5,6,7,8,9
>>> 10,11,12,13,14,15,16,17,18,19
>>> 20,21,22,23,24,25,26,27,28,29
>>>
>>> I.e. elements from 10 to 35 are not being processed.
>>>
>>> Does it make sense to have: count OR timeout window which will evict new
>>> window when number of elements reach a threshold OR collecting timeout
>>> occurs?
>>>
>>
>

Re: Count windows missing last elements?

Posted by Kostya Kulagin <kk...@gmail.com>.
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At
least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> yes, you can achieve this by writing a custom Trigger that can trigger
> both on the count or after a long-enough timeout. It would be a combination
> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
> could look to those to get started.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com> wrote:
>
>> I have a pretty big but final stream and I need to be able to window it
>> by number of elements.
>> In this case from my observations flink can 'skip' the latest chunk of
>> data if it has lower amount of elements than window size:
>>
>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>
>>       @Override
>>       public void run(SourceContext<Long> ctx) throws Exception {
>>         LongStream.range(0, 35).forEach(ctx::collect);
>>       }
>>
>>       @Override
>>       public void cancel() {
>>
>>       }
>>     });
>>
>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>       @Override
>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>         System.out.println(Joiner.on(',').join(values));
>>       }
>>     }).print();
>>
>>     env.execute("yoyoyo");
>>
>>
>> Output:
>> 0,1,2,3,4,5,6,7,8,9
>> 10,11,12,13,14,15,16,17,18,19
>> 20,21,22,23,24,25,26,27,28,29
>>
>> I.e. elements from 10 to 35 are not being processed.
>>
>> Does it make sense to have: count OR timeout window which will evict new
>> window when number of elements reach a threshold OR collecting timeout
>> occurs?
>>
>

Re: Count windows missing last elements?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both
on the count or after a long-enough timeout. It would be a combination of
CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could
look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kk...@gmail.com> wrote:

> I have a pretty big but final stream and I need to be able to window it by
> number of elements.
> In this case from my observations flink can 'skip' the latest chunk of
> data if it has lower amount of elements than window size:
>
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>
>       @Override
>       public void run(SourceContext<Long> ctx) throws Exception {
>         LongStream.range(0, 35).forEach(ctx::collect);
>       }
>
>       @Override
>       public void cancel() {
>
>       }
>     });
>
>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>       @Override
>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>         System.out.println(Joiner.on(',').join(values));
>       }
>     }).print();
>
>     env.execute("yoyoyo");
>
>
> Output:
> 0,1,2,3,4,5,6,7,8,9
> 10,11,12,13,14,15,16,17,18,19
> 20,21,22,23,24,25,26,27,28,29
>
> I.e. elements from 10 to 35 are not being processed.
>
> Does it make sense to have: count OR timeout window which will evict new
> window when number of elements reach a threshold OR collecting timeout
> occurs?
>