You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matt <dr...@gmail.com> on 2016/12/14 17:17:00 UTC

Multiple consumers and custom triggers

Hello people,

I've written down some quick questions for which I couldn't find much or
anything in the documentation. I hope you can answer some of them!

*# Multiple consumers*

*1.* Is it possible to .union() streams of different classes? It is useful
to create a consumer that counts elements on different topics for example,
using a key such as the class name of the element, and a tumbling window of
5 mins let's say.

*2.* In case #1 is not possible, I need to launch multiple consumers to
achieve the same effect. However, I'm getting a "Factory already
initialized" error if I run environment.execute() for two consumers on
different threads. How do you .execute() more than one consumer on the same
application?

*# Custom triggers*

*3.* If a custom .trigger() overwrites the trigger of the WindowAssigner
used previously, why do we have to specify a WindowAssigner (such as
TumblingProcessingTimeWindows) in order to be able to specify a custom
trigger? Shouldn't it be possible to send a trigger to .window()?

*4.* I need a stream with a CountWindow (size 10, slide 1 let's say) that
may take more than 10 hours fill for the first time, but in the meanwhile I
want to process whatever elements already generated. I guess the way to do
this is to create a custom trigger that fires on every new element, with up
to 10 elements at a time. The result would be windows of sizes: 1 element,
then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to achieve this with
predefined triggers or a custom trigger is the only way to go here?

Best regards,
Matt

Re: Multiple consumers and custom triggers

Posted by Jamie Grier <ja...@data-artisans.com>.
To be more clear...

A single source in a Flink program is a logical concept.  Flink jobs are
run with some level of parallelism meaning that multiple copies of your
source (and all other) functions are run distributed across a cluster.  So
if you have a streaming program with two sources and you run with a
parallelism of 8 there are actually a total of 16 source functions
executing on the cluster -- 8 instances of each of the two source operators
you've defined in your Flink job.

For more info on this you may want to read through the following:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/concepts/concepts.html

On Thu, Dec 15, 2016 at 3:21 PM, Jamie Grier <ja...@data-artisans.com>
wrote:

> All streams can be parallelized in Flink even with only one source.  You
> can have multiple sinks as well.
>
> On Thu, Dec 15, 2016 at 7:56 AM, Meghashyam Sandeep V <
> vr1meghashyam@gmail.com> wrote:
>
>> 1. If we have multiple sources, can the streams be parallelized ?
>> 2. Can we have multiple sinks as well?
>>
>> On Dec 14, 2016 10:46 PM, <dr...@gmail.com> wrote:
>>
>>> Got it. Thanks!
>>>
>>> On Dec 15, 2016, at 02:58, Jamie Grier <ja...@data-artisans.com> wrote:
>>>
>>> Ahh, sorry, for #2: A single Flink job can have as many sources as you
>>> like. They can be combined in multiple ways, via things like joins, or
>>> connect(), etc. They can also be completely independent — in other words
>>> the data flow graph can be completely disjoint. You never to need to call
>>> execute() more than once. Just define you program, with as many sources as
>>> you want, and then call execute().
>>>
>>> val stream1 = env.addSource(...)val stream2 = env.addSource(...)
>>>
>>> stream1
>>>   .map(...)
>>>   .addSink(...)
>>>
>>> stream2
>>>   .map(...)
>>>   .addSink(...)
>>>
>>> env.execute() // this is all you need
>>>
>>> ​
>>>
>>> On Wed, Dec 14, 2016 at 4:02 PM, Matt <dr...@gmail.com> wrote:
>>>
>>>> Hey Jamie,
>>>>
>>>> Ok with #1. I guess #2 is just not possible.
>>>>
>>>> I got it about #3. I just checked the code for the tumbling window
>>>> assigner and I noticed it's just its default trigger that gets overwritten
>>>> when using a custom trigger, not the way it assigns windows, it makes sense
>>>> now.
>>>>
>>>> Regarding #4, after doing some more tests I think it's more complex
>>>> than I first thought. I'll probably create another thread explaining more
>>>> that specific question.
>>>>
>>>> Thanks,
>>>> Matt
>>>>
>>>> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <ja...@data-artisans.com>
>>>> wrote:
>>>>
>>>>> For #1 there are a couple of ways to do this.  The easiest is probably
>>>>> stream1.connect(stream2).map(...) where the MapFunction maps the two
>>>>> input types to a common type that you can then process uniformly.
>>>>>
>>>>> For #3 There must always be a WindowAssigner specified.  There are
>>>>> some convenient ways to do this in the API such at timeWindow(), or
>>>>> window(TumblingProcessingTimeWindows.of(...)), etc, however you
>>>>> always must do this whether your provide your own trigger implementation or
>>>>> not.  The way to use window(...) with and customer trigger is just:
>>>>>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
>>>>> similar.  Not sure if I answered your question though..
>>>>>
>>>>> For #4: If I understand you correctly that is exactly what
>>>>> CountWindow(10, 1) does already.  For example if your input data was a
>>>>> sequence of integers starting with 0 the output would be:
>>>>>
>>>>> (0)
>>>>> (0, 1)
>>>>> (0, 1, 2)
>>>>> (0, 1, 2, 3)
>>>>> (0, 1, 2, 3, 4)
>>>>> (0, 1, 2, 3, 4, 5)
>>>>> (0, 1, 2, 3, 4, 5, 6)
>>>>> (0, 1, 2, 3, 4, 5, 6, 7)
>>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>>>>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>>>>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>>>>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>>>>> ...
>>>>> etc
>>>>>
>>>>> -Jamie
>>>>>
>>>>>
>>>>> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dr...@gmail.com> wrote:
>>>>>
>>>>>> Hello people,
>>>>>>
>>>>>> I've written down some quick questions for which I couldn't find much
>>>>>> or anything in the documentation. I hope you can answer some of them!
>>>>>>
>>>>>> *# Multiple consumers*
>>>>>>
>>>>>> *1.* Is it possible to .union() streams of different classes? It is
>>>>>> useful to create a consumer that counts elements on different topics for
>>>>>> example, using a key such as the class name of the element, and a tumbling
>>>>>> window of 5 mins let's say.
>>>>>>
>>>>>> *2.* In case #1 is not possible, I need to launch multiple consumers
>>>>>> to achieve the same effect. However, I'm getting a "Factory already
>>>>>> initialized" error if I run environment.execute() for two consumers on
>>>>>> different threads. How do you .execute() more than one consumer on the same
>>>>>> application?
>>>>>>
>>>>>> *# Custom triggers*
>>>>>>
>>>>>> *3.* If a custom .trigger() overwrites the trigger of the
>>>>>> WindowAssigner used previously, why do we have to specify a WindowAssigner
>>>>>> (such as TumblingProcessingTimeWindows) in order to be able to specify a
>>>>>> custom trigger? Shouldn't it be possible to send a trigger to .window()?
>>>>>>
>>>>>> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
>>>>>> that may take more than 10 hours fill for the first time, but in the
>>>>>> meanwhile I want to process whatever elements already generated. I guess
>>>>>> the way to do this is to create a custom trigger that fires on every new
>>>>>> element, with up to 10 elements at a time. The result would be windows of
>>>>>> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to
>>>>>> achieve this with predefined triggers or a custom trigger is the only way
>>>>>> to go here?
>>>>>>
>>>>>> Best regards,
>>>>>> Matt
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Jamie Grier
>>>>> data Artisans, Director of Applications Engineering
>>>>> @jamiegrier <https://twitter.com/jamiegrier>
>>>>> jamie@data-artisans.com
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Jamie Grier
>>> data Artisans, Director of Applications Engineering
>>> @jamiegrier <https://twitter.com/jamiegrier>
>>> jamie@data-artisans.com
>>>
>>>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Re: Multiple consumers and custom triggers

Posted by Jamie Grier <ja...@data-artisans.com>.
All streams can be parallelized in Flink even with only one source.  You
can have multiple sinks as well.

On Thu, Dec 15, 2016 at 7:56 AM, Meghashyam Sandeep V <
vr1meghashyam@gmail.com> wrote:

> 1. If we have multiple sources, can the streams be parallelized ?
> 2. Can we have multiple sinks as well?
>
> On Dec 14, 2016 10:46 PM, <dr...@gmail.com> wrote:
>
>> Got it. Thanks!
>>
>> On Dec 15, 2016, at 02:58, Jamie Grier <ja...@data-artisans.com> wrote:
>>
>> Ahh, sorry, for #2: A single Flink job can have as many sources as you
>> like. They can be combined in multiple ways, via things like joins, or
>> connect(), etc. They can also be completely independent — in other words
>> the data flow graph can be completely disjoint. You never to need to call
>> execute() more than once. Just define you program, with as many sources as
>> you want, and then call execute().
>>
>> val stream1 = env.addSource(...)val stream2 = env.addSource(...)
>>
>> stream1
>>   .map(...)
>>   .addSink(...)
>>
>> stream2
>>   .map(...)
>>   .addSink(...)
>>
>> env.execute() // this is all you need
>>
>> ​
>>
>> On Wed, Dec 14, 2016 at 4:02 PM, Matt <dr...@gmail.com> wrote:
>>
>>> Hey Jamie,
>>>
>>> Ok with #1. I guess #2 is just not possible.
>>>
>>> I got it about #3. I just checked the code for the tumbling window
>>> assigner and I noticed it's just its default trigger that gets overwritten
>>> when using a custom trigger, not the way it assigns windows, it makes sense
>>> now.
>>>
>>> Regarding #4, after doing some more tests I think it's more complex than
>>> I first thought. I'll probably create another thread explaining more that
>>> specific question.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <ja...@data-artisans.com>
>>> wrote:
>>>
>>>> For #1 there are a couple of ways to do this.  The easiest is probably
>>>> stream1.connect(stream2).map(...) where the MapFunction maps the two
>>>> input types to a common type that you can then process uniformly.
>>>>
>>>> For #3 There must always be a WindowAssigner specified.  There are some
>>>> convenient ways to do this in the API such at timeWindow(), or
>>>> window(TumblingProcessingTimeWindows.of(...)), etc, however you always
>>>> must do this whether your provide your own trigger implementation or not.
>>>> The way to use window(...) with and customer trigger is just:
>>>>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
>>>> similar.  Not sure if I answered your question though..
>>>>
>>>> For #4: If I understand you correctly that is exactly what
>>>> CountWindow(10, 1) does already.  For example if your input data was a
>>>> sequence of integers starting with 0 the output would be:
>>>>
>>>> (0)
>>>> (0, 1)
>>>> (0, 1, 2)
>>>> (0, 1, 2, 3)
>>>> (0, 1, 2, 3, 4)
>>>> (0, 1, 2, 3, 4, 5)
>>>> (0, 1, 2, 3, 4, 5, 6)
>>>> (0, 1, 2, 3, 4, 5, 6, 7)
>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>>>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>>>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>>>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>>>> ...
>>>> etc
>>>>
>>>> -Jamie
>>>>
>>>>
>>>> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dr...@gmail.com> wrote:
>>>>
>>>>> Hello people,
>>>>>
>>>>> I've written down some quick questions for which I couldn't find much
>>>>> or anything in the documentation. I hope you can answer some of them!
>>>>>
>>>>> *# Multiple consumers*
>>>>>
>>>>> *1.* Is it possible to .union() streams of different classes? It is
>>>>> useful to create a consumer that counts elements on different topics for
>>>>> example, using a key such as the class name of the element, and a tumbling
>>>>> window of 5 mins let's say.
>>>>>
>>>>> *2.* In case #1 is not possible, I need to launch multiple consumers
>>>>> to achieve the same effect. However, I'm getting a "Factory already
>>>>> initialized" error if I run environment.execute() for two consumers on
>>>>> different threads. How do you .execute() more than one consumer on the same
>>>>> application?
>>>>>
>>>>> *# Custom triggers*
>>>>>
>>>>> *3.* If a custom .trigger() overwrites the trigger of the
>>>>> WindowAssigner used previously, why do we have to specify a WindowAssigner
>>>>> (such as TumblingProcessingTimeWindows) in order to be able to specify a
>>>>> custom trigger? Shouldn't it be possible to send a trigger to .window()?
>>>>>
>>>>> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
>>>>> that may take more than 10 hours fill for the first time, but in the
>>>>> meanwhile I want to process whatever elements already generated. I guess
>>>>> the way to do this is to create a custom trigger that fires on every new
>>>>> element, with up to 10 elements at a time. The result would be windows of
>>>>> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to
>>>>> achieve this with predefined triggers or a custom trigger is the only way
>>>>> to go here?
>>>>>
>>>>> Best regards,
>>>>> Matt
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Jamie Grier
>>>> data Artisans, Director of Applications Engineering
>>>> @jamiegrier <https://twitter.com/jamiegrier>
>>>> jamie@data-artisans.com
>>>>
>>>>
>>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> jamie@data-artisans.com
>>
>>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Re: Multiple consumers and custom triggers

Posted by Meghashyam Sandeep V <vr...@gmail.com>.
1. If we have multiple sources, can the streams be parallelized ?
2. Can we have multiple sinks as well?

On Dec 14, 2016 10:46 PM, <dr...@gmail.com> wrote:

> Got it. Thanks!
>
> On Dec 15, 2016, at 02:58, Jamie Grier <ja...@data-artisans.com> wrote:
>
> Ahh, sorry, for #2: A single Flink job can have as many sources as you
> like. They can be combined in multiple ways, via things like joins, or
> connect(), etc. They can also be completely independent — in other words
> the data flow graph can be completely disjoint. You never to need to call
> execute() more than once. Just define you program, with as many sources as
> you want, and then call execute().
>
> val stream1 = env.addSource(...)val stream2 = env.addSource(...)
>
> stream1
>   .map(...)
>   .addSink(...)
>
> stream2
>   .map(...)
>   .addSink(...)
>
> env.execute() // this is all you need
>
> ​
>
> On Wed, Dec 14, 2016 at 4:02 PM, Matt <dr...@gmail.com> wrote:
>
>> Hey Jamie,
>>
>> Ok with #1. I guess #2 is just not possible.
>>
>> I got it about #3. I just checked the code for the tumbling window
>> assigner and I noticed it's just its default trigger that gets overwritten
>> when using a custom trigger, not the way it assigns windows, it makes sense
>> now.
>>
>> Regarding #4, after doing some more tests I think it's more complex than
>> I first thought. I'll probably create another thread explaining more that
>> specific question.
>>
>> Thanks,
>> Matt
>>
>> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <ja...@data-artisans.com>
>> wrote:
>>
>>> For #1 there are a couple of ways to do this.  The easiest is probably
>>> stream1.connect(stream2).map(...) where the MapFunction maps the two
>>> input types to a common type that you can then process uniformly.
>>>
>>> For #3 There must always be a WindowAssigner specified.  There are some
>>> convenient ways to do this in the API such at timeWindow(), or
>>> window(TumblingProcessingTimeWindows.of(...)), etc, however you always
>>> must do this whether your provide your own trigger implementation or not.
>>> The way to use window(...) with and customer trigger is just:
>>>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
>>> similar.  Not sure if I answered your question though..
>>>
>>> For #4: If I understand you correctly that is exactly what
>>> CountWindow(10, 1) does already.  For example if your input data was a
>>> sequence of integers starting with 0 the output would be:
>>>
>>> (0)
>>> (0, 1)
>>> (0, 1, 2)
>>> (0, 1, 2, 3)
>>> (0, 1, 2, 3, 4)
>>> (0, 1, 2, 3, 4, 5)
>>> (0, 1, 2, 3, 4, 5, 6)
>>> (0, 1, 2, 3, 4, 5, 6, 7)
>>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>>> ...
>>> etc
>>>
>>> -Jamie
>>>
>>>
>>> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dr...@gmail.com> wrote:
>>>
>>>> Hello people,
>>>>
>>>> I've written down some quick questions for which I couldn't find much
>>>> or anything in the documentation. I hope you can answer some of them!
>>>>
>>>> *# Multiple consumers*
>>>>
>>>> *1.* Is it possible to .union() streams of different classes? It is
>>>> useful to create a consumer that counts elements on different topics for
>>>> example, using a key such as the class name of the element, and a tumbling
>>>> window of 5 mins let's say.
>>>>
>>>> *2.* In case #1 is not possible, I need to launch multiple consumers
>>>> to achieve the same effect. However, I'm getting a "Factory already
>>>> initialized" error if I run environment.execute() for two consumers on
>>>> different threads. How do you .execute() more than one consumer on the same
>>>> application?
>>>>
>>>> *# Custom triggers*
>>>>
>>>> *3.* If a custom .trigger() overwrites the trigger of the
>>>> WindowAssigner used previously, why do we have to specify a WindowAssigner
>>>> (such as TumblingProcessingTimeWindows) in order to be able to specify a
>>>> custom trigger? Shouldn't it be possible to send a trigger to .window()?
>>>>
>>>> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
>>>> that may take more than 10 hours fill for the first time, but in the
>>>> meanwhile I want to process whatever elements already generated. I guess
>>>> the way to do this is to create a custom trigger that fires on every new
>>>> element, with up to 10 elements at a time. The result would be windows of
>>>> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to
>>>> achieve this with predefined triggers or a custom trigger is the only way
>>>> to go here?
>>>>
>>>> Best regards,
>>>> Matt
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> Jamie Grier
>>> data Artisans, Director of Applications Engineering
>>> @jamiegrier <https://twitter.com/jamiegrier>
>>> jamie@data-artisans.com
>>>
>>>
>>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>
>

Re: Multiple consumers and custom triggers

Posted by dr...@gmail.com.
Got it. Thanks!

> On Dec 15, 2016, at 02:58, Jamie Grier <ja...@data-artisans.com> wrote:
> 
> Ahh, sorry, for #2: A single Flink job can have as many sources as you like. They can be combined in multiple ways, via things like joins, or connect(), etc. They can also be completely independent — in other words the data flow graph can be completely disjoint. You never to need to call execute() more than once. Just define you program, with as many sources as you want, and then call execute().
> 
> val stream1 = env.addSource(...)
> val stream2 = env.addSource(...)
> 
> stream1
>   .map(...)
>   .addSink(...)
> 
> stream2
>   .map(...)
>   .addSink(...)
> 
> env.execute() // this is all you need
> 
>> On Wed, Dec 14, 2016 at 4:02 PM, Matt <dr...@gmail.com> wrote:
>> Hey Jamie,
>> 
>> Ok with #1. I guess #2 is just not possible.
>> 
>> I got it about #3. I just checked the code for the tumbling window assigner and I noticed it's just its default trigger that gets overwritten when using a custom trigger, not the way it assigns windows, it makes sense now.
>> 
>> Regarding #4, after doing some more tests I think it's more complex than I first thought. I'll probably create another thread explaining more that specific question.
>> 
>> Thanks,
>> Matt
>> 
>>> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <ja...@data-artisans.com> wrote:
>>> For #1 there are a couple of ways to do this.  The easiest is probably stream1.connect(stream2).map(...) where the MapFunction maps the two input types to a common type that you can then process uniformly.
>>> 
>>> For #3 There must always be a WindowAssigner specified.  There are some convenient ways to do this in the API such at timeWindow(), or window(TumblingProcessingTimeWindows.of(...)), etc, however you always must do this whether your provide your own trigger implementation or not.  The way to use window(...) with and customer trigger is just:  stream.keyBy(...).window(...).trigger(...).apply(...) or something similar.  Not sure if I answered your question though..
>>> 
>>> For #4: If I understand you correctly that is exactly what CountWindow(10, 1) does already.  For example if your input data was a sequence of integers starting with 0 the output would be:
>>> 
>>> (0)
>>> (0, 1)
>>> (0, 1, 2)
>>> (0, 1, 2, 3)
>>> (0, 1, 2, 3, 4)
>>> (0, 1, 2, 3, 4, 5)
>>> (0, 1, 2, 3, 4, 5, 6)
>>> (0, 1, 2, 3, 4, 5, 6, 7)
>>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>>> ...
>>> etc
>>> 
>>> -Jamie
>>> 
>>> 
>>>> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dr...@gmail.com> wrote:
>>>> Hello people,
>>>> 
>>>> I've written down some quick questions for which I couldn't find much or anything in the documentation. I hope you can answer some of them!
>>>> 
>>>> # Multiple consumers
>>>> 
>>>> 1. Is it possible to .union() streams of different classes? It is useful to create a consumer that counts elements on different topics for example, using a key such as the class name of the element, and a tumbling window of 5 mins let's say.
>>>> 
>>>> 2. In case #1 is not possible, I need to launch multiple consumers to achieve the same effect. However, I'm getting a "Factory already initialized" error if I run environment.execute() for two consumers on different threads. How do you .execute() more than one consumer on the same application?
>>>> 
>>>> # Custom triggers
>>>> 
>>>> 3. If a custom .trigger() overwrites the trigger of the WindowAssigner used previously, why do we have to specify a WindowAssigner (such as TumblingProcessingTimeWindows) in order to be able to specify a custom trigger? Shouldn't it be possible to send a trigger to .window()?
>>>> 
>>>> 4. I need a stream with a CountWindow (size 10, slide 1 let's say) that may take more than 10 hours fill for the first time, but in the meanwhile I want to process whatever elements already generated. I guess the way to do this is to create a custom trigger that fires on every new element, with up to 10 elements at a time. The result would be windows of sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to achieve this with predefined triggers or a custom trigger is the only way to go here?
>>>> 
>>>> Best regards,
>>>> Matt
>>> 
>>> 
>>> 
>>> -- 
>>> 
>>> Jamie Grier
>>> data Artisans, Director of Applications Engineering
>>> @jamiegrier
>>> jamie@data-artisans.com
>>> 
>> 
> 
> 
> 
> -- 
> 
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier
> jamie@data-artisans.com
> 

Re: Multiple consumers and custom triggers

Posted by Jamie Grier <ja...@data-artisans.com>.
Ahh, sorry, for #2: A single Flink job can have as many sources as you
like. They can be combined in multiple ways, via things like joins, or
connect(), etc. They can also be completely independent — in other words
the data flow graph can be completely disjoint. You never to need to call
execute() more than once. Just define you program, with as many sources as
you want, and then call execute().

val stream1 = env.addSource(...)val stream2 = env.addSource(...)

stream1
  .map(...)
  .addSink(...)

stream2
  .map(...)
  .addSink(...)

env.execute() // this is all you need

​

On Wed, Dec 14, 2016 at 4:02 PM, Matt <dr...@gmail.com> wrote:

> Hey Jamie,
>
> Ok with #1. I guess #2 is just not possible.
>
> I got it about #3. I just checked the code for the tumbling window
> assigner and I noticed it's just its default trigger that gets overwritten
> when using a custom trigger, not the way it assigns windows, it makes sense
> now.
>
> Regarding #4, after doing some more tests I think it's more complex than I
> first thought. I'll probably create another thread explaining more that
> specific question.
>
> Thanks,
> Matt
>
> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
>
>> For #1 there are a couple of ways to do this.  The easiest is probably
>> stream1.connect(stream2).map(...) where the MapFunction maps the two
>> input types to a common type that you can then process uniformly.
>>
>> For #3 There must always be a WindowAssigner specified.  There are some
>> convenient ways to do this in the API such at timeWindow(), or
>> window(TumblingProcessingTimeWindows.of(...)), etc, however you always
>> must do this whether your provide your own trigger implementation or not.
>> The way to use window(...) with and customer trigger is just:
>>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
>> similar.  Not sure if I answered your question though..
>>
>> For #4: If I understand you correctly that is exactly what
>> CountWindow(10, 1) does already.  For example if your input data was a
>> sequence of integers starting with 0 the output would be:
>>
>> (0)
>> (0, 1)
>> (0, 1, 2)
>> (0, 1, 2, 3)
>> (0, 1, 2, 3, 4)
>> (0, 1, 2, 3, 4, 5)
>> (0, 1, 2, 3, 4, 5, 6)
>> (0, 1, 2, 3, 4, 5, 6, 7)
>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>> ...
>> etc
>>
>> -Jamie
>>
>>
>> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dr...@gmail.com> wrote:
>>
>>> Hello people,
>>>
>>> I've written down some quick questions for which I couldn't find much or
>>> anything in the documentation. I hope you can answer some of them!
>>>
>>> *# Multiple consumers*
>>>
>>> *1.* Is it possible to .union() streams of different classes? It is
>>> useful to create a consumer that counts elements on different topics for
>>> example, using a key such as the class name of the element, and a tumbling
>>> window of 5 mins let's say.
>>>
>>> *2.* In case #1 is not possible, I need to launch multiple consumers to
>>> achieve the same effect. However, I'm getting a "Factory already
>>> initialized" error if I run environment.execute() for two consumers on
>>> different threads. How do you .execute() more than one consumer on the same
>>> application?
>>>
>>> *# Custom triggers*
>>>
>>> *3.* If a custom .trigger() overwrites the trigger of the
>>> WindowAssigner used previously, why do we have to specify a WindowAssigner
>>> (such as TumblingProcessingTimeWindows) in order to be able to specify a
>>> custom trigger? Shouldn't it be possible to send a trigger to .window()?
>>>
>>> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
>>> that may take more than 10 hours fill for the first time, but in the
>>> meanwhile I want to process whatever elements already generated. I guess
>>> the way to do this is to create a custom trigger that fires on every new
>>> element, with up to 10 elements at a time. The result would be windows of
>>> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to
>>> achieve this with predefined triggers or a custom trigger is the only way
>>> to go here?
>>>
>>> Best regards,
>>> Matt
>>>
>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> jamie@data-artisans.com
>>
>>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Re: Multiple consumers and custom triggers

Posted by Matt <dr...@gmail.com>.
Hey Jamie,

Ok with #1. I guess #2 is just not possible.

I got it about #3. I just checked the code for the tumbling window assigner
and I noticed it's just its default trigger that gets overwritten when
using a custom trigger, not the way it assigns windows, it makes sense now.

Regarding #4, after doing some more tests I think it's more complex than I
first thought. I'll probably create another thread explaining more that
specific question.

Thanks,
Matt

On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <ja...@data-artisans.com>
wrote:

> For #1 there are a couple of ways to do this.  The easiest is probably
> stream1.connect(stream2).map(...) where the MapFunction maps the two
> input types to a common type that you can then process uniformly.
>
> For #3 There must always be a WindowAssigner specified.  There are some
> convenient ways to do this in the API such at timeWindow(), or window(
> TumblingProcessingTimeWindows.of(...)), etc, however you always must do
> this whether your provide your own trigger implementation or not.  The way
> to use window(...) with and customer trigger is just:
>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
> similar.  Not sure if I answered your question though..
>
> For #4: If I understand you correctly that is exactly what CountWindow(10,
> 1) does already.  For example if your input data was a sequence of integers
> starting with 0 the output would be:
>
> (0)
> (0, 1)
> (0, 1, 2)
> (0, 1, 2, 3)
> (0, 1, 2, 3, 4)
> (0, 1, 2, 3, 4, 5)
> (0, 1, 2, 3, 4, 5, 6)
> (0, 1, 2, 3, 4, 5, 6, 7)
> (0, 1, 2, 3, 4, 5, 6, 7, 8)
> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
> ...
> etc
>
> -Jamie
>
>
> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dr...@gmail.com> wrote:
>
>> Hello people,
>>
>> I've written down some quick questions for which I couldn't find much or
>> anything in the documentation. I hope you can answer some of them!
>>
>> *# Multiple consumers*
>>
>> *1.* Is it possible to .union() streams of different classes? It is
>> useful to create a consumer that counts elements on different topics for
>> example, using a key such as the class name of the element, and a tumbling
>> window of 5 mins let's say.
>>
>> *2.* In case #1 is not possible, I need to launch multiple consumers to
>> achieve the same effect. However, I'm getting a "Factory already
>> initialized" error if I run environment.execute() for two consumers on
>> different threads. How do you .execute() more than one consumer on the same
>> application?
>>
>> *# Custom triggers*
>>
>> *3.* If a custom .trigger() overwrites the trigger of the WindowAssigner
>> used previously, why do we have to specify a WindowAssigner (such as
>> TumblingProcessingTimeWindows) in order to be able to specify a custom
>> trigger? Shouldn't it be possible to send a trigger to .window()?
>>
>> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
>> that may take more than 10 hours fill for the first time, but in the
>> meanwhile I want to process whatever elements already generated. I guess
>> the way to do this is to create a custom trigger that fires on every new
>> element, with up to 10 elements at a time. The result would be windows of
>> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to
>> achieve this with predefined triggers or a custom trigger is the only way
>> to go here?
>>
>> Best regards,
>> Matt
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>
>

Re: Multiple consumers and custom triggers

Posted by Jamie Grier <ja...@data-artisans.com>.
For #1 there are a couple of ways to do this.  The easiest is probably
stream1.connect(stream2).map(...) where the MapFunction maps the two input
types to a common type that you can then process uniformly.

For #3 There must always be a WindowAssigner specified.  There are some
convenient ways to do this in the API such at timeWindow(), or
window(TumblingProcessingTimeWindows.of(...)), etc, however you always must
do this whether your provide your own trigger implementation or not.  The
way to use window(...) with and customer trigger is just:
 stream.keyBy(...).window(...).trigger(...).apply(...) or something
similar.  Not sure if I answered your question though..

For #4: If I understand you correctly that is exactly what CountWindow(10,
1) does already.  For example if your input data was a sequence of integers
starting with 0 the output would be:

(0)
(0, 1)
(0, 1, 2)
(0, 1, 2, 3)
(0, 1, 2, 3, 4)
(0, 1, 2, 3, 4, 5)
(0, 1, 2, 3, 4, 5, 6)
(0, 1, 2, 3, 4, 5, 6, 7)
(0, 1, 2, 3, 4, 5, 6, 7, 8)
(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
(4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
...
etc

-Jamie


On Wed, Dec 14, 2016 at 9:17 AM, Matt <dr...@gmail.com> wrote:

> Hello people,
>
> I've written down some quick questions for which I couldn't find much or
> anything in the documentation. I hope you can answer some of them!
>
> *# Multiple consumers*
>
> *1.* Is it possible to .union() streams of different classes? It is
> useful to create a consumer that counts elements on different topics for
> example, using a key such as the class name of the element, and a tumbling
> window of 5 mins let's say.
>
> *2.* In case #1 is not possible, I need to launch multiple consumers to
> achieve the same effect. However, I'm getting a "Factory already
> initialized" error if I run environment.execute() for two consumers on
> different threads. How do you .execute() more than one consumer on the same
> application?
>
> *# Custom triggers*
>
> *3.* If a custom .trigger() overwrites the trigger of the WindowAssigner
> used previously, why do we have to specify a WindowAssigner (such as
> TumblingProcessingTimeWindows) in order to be able to specify a custom
> trigger? Shouldn't it be possible to send a trigger to .window()?
>
> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say) that
> may take more than 10 hours fill for the first time, but in the meanwhile I
> want to process whatever elements already generated. I guess the way to do
> this is to create a custom trigger that fires on every new element, with up
> to 10 elements at a time. The result would be windows of sizes: 1 element,
> then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to achieve this with
> predefined triggers or a custom trigger is the only way to go here?
>
> Best regards,
> Matt
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com