You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2015/12/10 11:55:48 UTC

Getting two types of events from a Window (Trigger)?

Hi,

I'm working on something that uses the Flink Window feature.
I have written a custom Trigger to build the Window I need.

I am using the Window feature because I need state and I need to expire
(and clean) this state after a timeout (I use the onEventTime to do that).
Because I need the data streaming in real time (augmented with the
mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
window I need the fact of this purge (and some of the stats of this Window)
as a separate event in a separate 'DataStream'.

Now the interfaces of the various classes only support output as a single
java type (very sane choice).
So what I do right now is put my events on something 'external'
(HBase/Kafka) and read it in via a different Source implementation.

My question: Is there a better way to do this?
Can I (for example) create a special 'Source' that I can pass as a
parameter to my Trigger and then onEventTime just output a 'new event' ?

What do you recommend?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Getting two types of events from a Window (Trigger)?

Posted by Stephan Ewen <se...@apache.org>.
You can strip the parts that accumulate the data for writing the files,
then it becomes a very slim example to build on top of...

On Fri, Dec 11, 2015 at 3:20 PM, Niels Basjes <Ni...@basjes.nl> wrote:

> I'll have an other look at the example code you sent me.
> Thanks.
>
> On Fri, Dec 11, 2015 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Niels!
>>
>> Quick question (I am probably overlooking something here) - if you simply
>> want to emit each element (Trigger onElement) in addition to a special data
>> stream, can you not simply have two operators that consume the original
>> data stream: The window operator and the additional source.
>>
>> If you need each element to pass through the window function anyways, I
>> think it may almost be easier to use the custom state with timeout example
>> I sent you a while back. There you have fill flexibility and need not
>> separate between trigger state, window state, etc...
>>
>> Stephan
>>
>>
>>
>> On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi Niels,
>>> I’m afraid this will not work. (If I understood correctly what you are
>>> trying to do.) When the trigger is being serialized/deserialized each
>>> parallel instance of the trigger has their own copy of the QueueSource
>>> object. Plus, a separate instance of the QueueSource itself will be running
>>> in each parallel instance of the source operator. And there is no way for
>>> there being communication between the trigger and source, since they might
>>> now even run on the same machine in the end.
>>>
>>> Cheers,
>>> Aljoscha
>>> > On 11 Dec 2015, at 13:11, Niels Basjes <Ni...@basjes.nl> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Just to let you know: I tried passing a SourceFunction but I haven't
>>> been able to get that to work (yet).
>>> >
>>> > I passed an instance of this (see code below) into my Trigger and
>>> stored it there as:
>>> >     private QueueSource output;
>>> > and then I called from the onElement something like:
>>> >    output.put("Foo",1234);
>>> >
>>> > When I run this from my IDE I get two distinct instances of the queue
>>> (effect: the stuff I put in doesn't come out at the other end).
>>> >
>>> > Any pointers how (and if) this can be fixed are welcome.
>>> > Only if this works will I look into making this a generic (I got some
>>> type related exceptions when I tried that).
>>> >
>>> > Niels
>>> >
>>> >
>>> > (Below has Apache 2.0 License; so copy adapt and improve if you want
>>> to)
>>> > package nl.basjes.flink.experiments;
>>> >
>>> > import org.apache.flink.configuration.Configuration;
>>> > import
>>> org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
>>> >
>>> > import java.util.concurrent.ConcurrentLinkedQueue;
>>> >
>>> > public class QueueSource extends RichEventTimeSourceFunction<String> {
>>> >     private static final long serialVersionUID = 1L;
>>> >
>>> >     private volatile boolean isRunning = true;
>>> >
>>> >     private ConcurrentLinkedQueue<QueueElement> queue = new
>>> ConcurrentLinkedQueue<>();
>>> >
>>> >     @Override
>>> >     public void open(Configuration parameters) throws Exception {
>>> >         super.open(parameters);
>>> >     }
>>> >
>>> >     @Override
>>> >     public void close() throws Exception {
>>> >         super.close();
>>> >     }
>>> >
>>> >     @Override
>>> >     public void run(SourceContext<String> ctx) throws Exception {
>>> >         this.isRunning = true;
>>> >
>>> >         while (this.isRunning) {
>>> >             if (queue.isEmpty()) {
>>> >                 Thread.sleep(1); // Sleep 1 ms before retrying to
>>> dequeue again
>>> >                 continue;
>>> >             }
>>> >             QueueElement queueElement = queue.poll();
>>> >             ctx.collectWithTimestamp(queueElement.element,
>>> queueElement.timestamp);
>>> >         }
>>> >     }
>>> >
>>> >     public void cancel() {
>>> >         this.isRunning = false;
>>> >     }
>>> >
>>> >     public void put(String element, long timestamp) {
>>> >         QueueElement queueElement = new QueueElement();
>>> >         queueElement.element = element;
>>> >         queueElement.timestamp = timestamp;
>>> >         queue.add(queueElement);
>>> >     }
>>> > }
>>> >
>>> > class QueueElement {
>>> >     String element;
>>> >     long timestamp;
>>> > }
>>> >
>>> >
>>> >
>>> > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <Ni...@basjes.nl>
>>> wrote:
>>> > Thanks.
>>> >
>>> > The way I solved it now is by creating a class that persists data into
>>> something external (right now HBase and/or Kafka) and use that from the
>>> trigger to output the data.
>>> >
>>> > I have two followup questions:
>>> > 1) Is it possible to pass an instance of  'SourceFunction' as such a
>>> parameter (without breaking Flink)?
>>> > 2) I want to save resources so I'm using a single instance of my
>>> 'Extra data output class' in the instance of the Trigger. Thus reusing the
>>> connections to the outside over multiple Window instances. Can I assume
>>> that a single instance of Trigger will only be used by a single thread?
>>> I.e. Can I assume that I do not need locking and synchronization?
>>> >
>>> > Niels
>>> >
>>> > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org>
>>> wrote:
>>> > Hi Niels!
>>> >
>>> > I think there is no clean way to emit data from a trigger right now,
>>> you can only emit data from the window functions.
>>> >
>>> > You can emit two different kind of data types using an "Either" type.
>>> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
>>> >
>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
>>> >
>>> > Maybe being able to emit different type of elements helps your use
>>> case...
>>> >
>>> >
>>> > These types of questions have been coming up quite a bit, people
>>> looking to do different actions inside the windows on different triggers
>>> (on element, on event time).
>>> >
>>> > As per discussion with Aljoscha, one way to make this more flexible is
>>> to enhance what you can do with custom state:
>>> >   - State has timeouts (for cleanup)
>>> >   - Functions allow you to schedule event-time progress notifications
>>> >
>>> > Stephan
>>> >
>>> >
>>> >
>>> > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <Ni...@basjes.nl>
>>> wrote:
>>> > Hi,
>>> >
>>> > I'm working on something that uses the Flink Window feature.
>>> > I have written a custom Trigger to build the Window I need.
>>> >
>>> > I am using the Window feature because I need state and I need to
>>> expire (and clean) this state after a timeout (I use the onEventTime to do
>>> that).
>>> > Because I need the data streaming in real time (augmented with the
>>> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
>>> window I need the fact of this purge (and some of the stats of this Window)
>>> as a separate event in a separate 'DataStream'.
>>> >
>>> > Now the interfaces of the various classes only support output as a
>>> single java type (very sane choice).
>>> > So what I do right now is put my events on something 'external'
>>> (HBase/Kafka) and read it in via a different Source implementation.
>>> >
>>> > My question: Is there a better way to do this?
>>> > Can I (for example) create a special 'Source' that I can pass as a
>>> parameter to my Trigger and then onEventTime just output a 'new event' ?
>>> >
>>> > What do you recommend?
>>> >
>>> > --
>>> > Best regards / Met vriendelijke groeten,
>>> >
>>> > Niels Basjes
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > Best regards / Met vriendelijke groeten,
>>> >
>>> > Niels Basjes
>>> >
>>> >
>>> >
>>> > --
>>> > Best regards / Met vriendelijke groeten,
>>> >
>>> > Niels Basjes
>>>
>>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: Getting two types of events from a Window (Trigger)?

Posted by Niels Basjes <Ni...@basjes.nl>.
I'll have an other look at the example code you sent me.
Thanks.

On Fri, Dec 11, 2015 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Niels!
>
> Quick question (I am probably overlooking something here) - if you simply
> want to emit each element (Trigger onElement) in addition to a special data
> stream, can you not simply have two operators that consume the original
> data stream: The window operator and the additional source.
>
> If you need each element to pass through the window function anyways, I
> think it may almost be easier to use the custom state with timeout example
> I sent you a while back. There you have fill flexibility and need not
> separate between trigger state, window state, etc...
>
> Stephan
>
>
>
> On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi Niels,
>> I’m afraid this will not work. (If I understood correctly what you are
>> trying to do.) When the trigger is being serialized/deserialized each
>> parallel instance of the trigger has their own copy of the QueueSource
>> object. Plus, a separate instance of the QueueSource itself will be running
>> in each parallel instance of the source operator. And there is no way for
>> there being communication between the trigger and source, since they might
>> now even run on the same machine in the end.
>>
>> Cheers,
>> Aljoscha
>> > On 11 Dec 2015, at 13:11, Niels Basjes <Ni...@basjes.nl> wrote:
>> >
>> > Hi,
>> >
>> > Just to let you know: I tried passing a SourceFunction but I haven't
>> been able to get that to work (yet).
>> >
>> > I passed an instance of this (see code below) into my Trigger and
>> stored it there as:
>> >     private QueueSource output;
>> > and then I called from the onElement something like:
>> >    output.put("Foo",1234);
>> >
>> > When I run this from my IDE I get two distinct instances of the queue
>> (effect: the stuff I put in doesn't come out at the other end).
>> >
>> > Any pointers how (and if) this can be fixed are welcome.
>> > Only if this works will I look into making this a generic (I got some
>> type related exceptions when I tried that).
>> >
>> > Niels
>> >
>> >
>> > (Below has Apache 2.0 License; so copy adapt and improve if you want to)
>> > package nl.basjes.flink.experiments;
>> >
>> > import org.apache.flink.configuration.Configuration;
>> > import
>> org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
>> >
>> > import java.util.concurrent.ConcurrentLinkedQueue;
>> >
>> > public class QueueSource extends RichEventTimeSourceFunction<String> {
>> >     private static final long serialVersionUID = 1L;
>> >
>> >     private volatile boolean isRunning = true;
>> >
>> >     private ConcurrentLinkedQueue<QueueElement> queue = new
>> ConcurrentLinkedQueue<>();
>> >
>> >     @Override
>> >     public void open(Configuration parameters) throws Exception {
>> >         super.open(parameters);
>> >     }
>> >
>> >     @Override
>> >     public void close() throws Exception {
>> >         super.close();
>> >     }
>> >
>> >     @Override
>> >     public void run(SourceContext<String> ctx) throws Exception {
>> >         this.isRunning = true;
>> >
>> >         while (this.isRunning) {
>> >             if (queue.isEmpty()) {
>> >                 Thread.sleep(1); // Sleep 1 ms before retrying to
>> dequeue again
>> >                 continue;
>> >             }
>> >             QueueElement queueElement = queue.poll();
>> >             ctx.collectWithTimestamp(queueElement.element,
>> queueElement.timestamp);
>> >         }
>> >     }
>> >
>> >     public void cancel() {
>> >         this.isRunning = false;
>> >     }
>> >
>> >     public void put(String element, long timestamp) {
>> >         QueueElement queueElement = new QueueElement();
>> >         queueElement.element = element;
>> >         queueElement.timestamp = timestamp;
>> >         queue.add(queueElement);
>> >     }
>> > }
>> >
>> > class QueueElement {
>> >     String element;
>> >     long timestamp;
>> > }
>> >
>> >
>> >
>> > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>> > Thanks.
>> >
>> > The way I solved it now is by creating a class that persists data into
>> something external (right now HBase and/or Kafka) and use that from the
>> trigger to output the data.
>> >
>> > I have two followup questions:
>> > 1) Is it possible to pass an instance of  'SourceFunction' as such a
>> parameter (without breaking Flink)?
>> > 2) I want to save resources so I'm using a single instance of my 'Extra
>> data output class' in the instance of the Trigger. Thus reusing the
>> connections to the outside over multiple Window instances. Can I assume
>> that a single instance of Trigger will only be used by a single thread?
>> I.e. Can I assume that I do not need locking and synchronization?
>> >
>> > Niels
>> >
>> > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote:
>> > Hi Niels!
>> >
>> > I think there is no clean way to emit data from a trigger right now,
>> you can only emit data from the window functions.
>> >
>> > You can emit two different kind of data types using an "Either" type.
>> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
>> >
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
>> >
>> > Maybe being able to emit different type of elements helps your use
>> case...
>> >
>> >
>> > These types of questions have been coming up quite a bit, people
>> looking to do different actions inside the windows on different triggers
>> (on element, on event time).
>> >
>> > As per discussion with Aljoscha, one way to make this more flexible is
>> to enhance what you can do with custom state:
>> >   - State has timeouts (for cleanup)
>> >   - Functions allow you to schedule event-time progress notifications
>> >
>> > Stephan
>> >
>> >
>> >
>> > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>> > Hi,
>> >
>> > I'm working on something that uses the Flink Window feature.
>> > I have written a custom Trigger to build the Window I need.
>> >
>> > I am using the Window feature because I need state and I need to expire
>> (and clean) this state after a timeout (I use the onEventTime to do that).
>> > Because I need the data streaming in real time (augmented with the
>> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
>> window I need the fact of this purge (and some of the stats of this Window)
>> as a separate event in a separate 'DataStream'.
>> >
>> > Now the interfaces of the various classes only support output as a
>> single java type (very sane choice).
>> > So what I do right now is put my events on something 'external'
>> (HBase/Kafka) and read it in via a different Source implementation.
>> >
>> > My question: Is there a better way to do this?
>> > Can I (for example) create a special 'Source' that I can pass as a
>> parameter to my Trigger and then onEventTime just output a 'new event' ?
>> >
>> > What do you recommend?
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>>
>>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Getting two types of events from a Window (Trigger)?

Posted by Stephan Ewen <se...@apache.org>.
Hi Niels!

Quick question (I am probably overlooking something here) - if you simply
want to emit each element (Trigger onElement) in addition to a special data
stream, can you not simply have two operators that consume the original
data stream: The window operator and the additional source.

If you need each element to pass through the window function anyways, I
think it may almost be easier to use the custom state with timeout example
I sent you a while back. There you have fill flexibility and need not
separate between trigger state, window state, etc...

Stephan



On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Niels,
> I’m afraid this will not work. (If I understood correctly what you are
> trying to do.) When the trigger is being serialized/deserialized each
> parallel instance of the trigger has their own copy of the QueueSource
> object. Plus, a separate instance of the QueueSource itself will be running
> in each parallel instance of the source operator. And there is no way for
> there being communication between the trigger and source, since they might
> now even run on the same machine in the end.
>
> Cheers,
> Aljoscha
> > On 11 Dec 2015, at 13:11, Niels Basjes <Ni...@basjes.nl> wrote:
> >
> > Hi,
> >
> > Just to let you know: I tried passing a SourceFunction but I haven't
> been able to get that to work (yet).
> >
> > I passed an instance of this (see code below) into my Trigger and stored
> it there as:
> >     private QueueSource output;
> > and then I called from the onElement something like:
> >    output.put("Foo",1234);
> >
> > When I run this from my IDE I get two distinct instances of the queue
> (effect: the stuff I put in doesn't come out at the other end).
> >
> > Any pointers how (and if) this can be fixed are welcome.
> > Only if this works will I look into making this a generic (I got some
> type related exceptions when I tried that).
> >
> > Niels
> >
> >
> > (Below has Apache 2.0 License; so copy adapt and improve if you want to)
> > package nl.basjes.flink.experiments;
> >
> > import org.apache.flink.configuration.Configuration;
> > import
> org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
> >
> > import java.util.concurrent.ConcurrentLinkedQueue;
> >
> > public class QueueSource extends RichEventTimeSourceFunction<String> {
> >     private static final long serialVersionUID = 1L;
> >
> >     private volatile boolean isRunning = true;
> >
> >     private ConcurrentLinkedQueue<QueueElement> queue = new
> ConcurrentLinkedQueue<>();
> >
> >     @Override
> >     public void open(Configuration parameters) throws Exception {
> >         super.open(parameters);
> >     }
> >
> >     @Override
> >     public void close() throws Exception {
> >         super.close();
> >     }
> >
> >     @Override
> >     public void run(SourceContext<String> ctx) throws Exception {
> >         this.isRunning = true;
> >
> >         while (this.isRunning) {
> >             if (queue.isEmpty()) {
> >                 Thread.sleep(1); // Sleep 1 ms before retrying to
> dequeue again
> >                 continue;
> >             }
> >             QueueElement queueElement = queue.poll();
> >             ctx.collectWithTimestamp(queueElement.element,
> queueElement.timestamp);
> >         }
> >     }
> >
> >     public void cancel() {
> >         this.isRunning = false;
> >     }
> >
> >     public void put(String element, long timestamp) {
> >         QueueElement queueElement = new QueueElement();
> >         queueElement.element = element;
> >         queueElement.timestamp = timestamp;
> >         queue.add(queueElement);
> >     }
> > }
> >
> > class QueueElement {
> >     String element;
> >     long timestamp;
> > }
> >
> >
> >
> > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> > Thanks.
> >
> > The way I solved it now is by creating a class that persists data into
> something external (right now HBase and/or Kafka) and use that from the
> trigger to output the data.
> >
> > I have two followup questions:
> > 1) Is it possible to pass an instance of  'SourceFunction' as such a
> parameter (without breaking Flink)?
> > 2) I want to save resources so I'm using a single instance of my 'Extra
> data output class' in the instance of the Trigger. Thus reusing the
> connections to the outside over multiple Window instances. Can I assume
> that a single instance of Trigger will only be used by a single thread?
> I.e. Can I assume that I do not need locking and synchronization?
> >
> > Niels
> >
> > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote:
> > Hi Niels!
> >
> > I think there is no clean way to emit data from a trigger right now, you
> can only emit data from the window functions.
> >
> > You can emit two different kind of data types using an "Either" type.
> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
> >
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
> >
> > Maybe being able to emit different type of elements helps your use
> case...
> >
> >
> > These types of questions have been coming up quite a bit, people looking
> to do different actions inside the windows on different triggers (on
> element, on event time).
> >
> > As per discussion with Aljoscha, one way to make this more flexible is
> to enhance what you can do with custom state:
> >   - State has timeouts (for cleanup)
> >   - Functions allow you to schedule event-time progress notifications
> >
> > Stephan
> >
> >
> >
> > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> > Hi,
> >
> > I'm working on something that uses the Flink Window feature.
> > I have written a custom Trigger to build the Window I need.
> >
> > I am using the Window feature because I need state and I need to expire
> (and clean) this state after a timeout (I use the onEventTime to do that).
> > Because I need the data streaming in real time (augmented with the
> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
> window I need the fact of this purge (and some of the stats of this Window)
> as a separate event in a separate 'DataStream'.
> >
> > Now the interfaces of the various classes only support output as a
> single java type (very sane choice).
> > So what I do right now is put my events on something 'external'
> (HBase/Kafka) and read it in via a different Source implementation.
> >
> > My question: Is there a better way to do this?
> > Can I (for example) create a special 'Source' that I can pass as a
> parameter to my Trigger and then onEventTime just output a 'new event' ?
> >
> > What do you recommend?
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
> >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>
>

Re: Getting two types of events from a Window (Trigger)?

Posted by Niels Basjes <Ni...@basjes.nl>.
I sort of expected this to be the case.
Thanks, for confirming.

Niels

On Fri, Dec 11, 2015 at 1:59 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Niels,
> I’m afraid this will not work. (If I understood correctly what you are
> trying to do.) When the trigger is being serialized/deserialized each
> parallel instance of the trigger has their own copy of the QueueSource
> object. Plus, a separate instance of the QueueSource itself will be running
> in each parallel instance of the source operator. And there is no way for
> there being communication between the trigger and source, since they might
> now even run on the same machine in the end.
>
> Cheers,
> Aljoscha
> > On 11 Dec 2015, at 13:11, Niels Basjes <Ni...@basjes.nl> wrote:
> >
> > Hi,
> >
> > Just to let you know: I tried passing a SourceFunction but I haven't
> been able to get that to work (yet).
> >
> > I passed an instance of this (see code below) into my Trigger and stored
> it there as:
> >     private QueueSource output;
> > and then I called from the onElement something like:
> >    output.put("Foo",1234);
> >
> > When I run this from my IDE I get two distinct instances of the queue
> (effect: the stuff I put in doesn't come out at the other end).
> >
> > Any pointers how (and if) this can be fixed are welcome.
> > Only if this works will I look into making this a generic (I got some
> type related exceptions when I tried that).
> >
> > Niels
> >
> >
> > (Below has Apache 2.0 License; so copy adapt and improve if you want to)
> > package nl.basjes.flink.experiments;
> >
> > import org.apache.flink.configuration.Configuration;
> > import
> org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
> >
> > import java.util.concurrent.ConcurrentLinkedQueue;
> >
> > public class QueueSource extends RichEventTimeSourceFunction<String> {
> >     private static final long serialVersionUID = 1L;
> >
> >     private volatile boolean isRunning = true;
> >
> >     private ConcurrentLinkedQueue<QueueElement> queue = new
> ConcurrentLinkedQueue<>();
> >
> >     @Override
> >     public void open(Configuration parameters) throws Exception {
> >         super.open(parameters);
> >     }
> >
> >     @Override
> >     public void close() throws Exception {
> >         super.close();
> >     }
> >
> >     @Override
> >     public void run(SourceContext<String> ctx) throws Exception {
> >         this.isRunning = true;
> >
> >         while (this.isRunning) {
> >             if (queue.isEmpty()) {
> >                 Thread.sleep(1); // Sleep 1 ms before retrying to
> dequeue again
> >                 continue;
> >             }
> >             QueueElement queueElement = queue.poll();
> >             ctx.collectWithTimestamp(queueElement.element,
> queueElement.timestamp);
> >         }
> >     }
> >
> >     public void cancel() {
> >         this.isRunning = false;
> >     }
> >
> >     public void put(String element, long timestamp) {
> >         QueueElement queueElement = new QueueElement();
> >         queueElement.element = element;
> >         queueElement.timestamp = timestamp;
> >         queue.add(queueElement);
> >     }
> > }
> >
> > class QueueElement {
> >     String element;
> >     long timestamp;
> > }
> >
> >
> >
> > On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> > Thanks.
> >
> > The way I solved it now is by creating a class that persists data into
> something external (right now HBase and/or Kafka) and use that from the
> trigger to output the data.
> >
> > I have two followup questions:
> > 1) Is it possible to pass an instance of  'SourceFunction' as such a
> parameter (without breaking Flink)?
> > 2) I want to save resources so I'm using a single instance of my 'Extra
> data output class' in the instance of the Trigger. Thus reusing the
> connections to the outside over multiple Window instances. Can I assume
> that a single instance of Trigger will only be used by a single thread?
> I.e. Can I assume that I do not need locking and synchronization?
> >
> > Niels
> >
> > On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote:
> > Hi Niels!
> >
> > I think there is no clean way to emit data from a trigger right now, you
> can only emit data from the window functions.
> >
> > You can emit two different kind of data types using an "Either" type.
> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
> >
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
> >
> > Maybe being able to emit different type of elements helps your use
> case...
> >
> >
> > These types of questions have been coming up quite a bit, people looking
> to do different actions inside the windows on different triggers (on
> element, on event time).
> >
> > As per discussion with Aljoscha, one way to make this more flexible is
> to enhance what you can do with custom state:
> >   - State has timeouts (for cleanup)
> >   - Functions allow you to schedule event-time progress notifications
> >
> > Stephan
> >
> >
> >
> > On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> > Hi,
> >
> > I'm working on something that uses the Flink Window feature.
> > I have written a custom Trigger to build the Window I need.
> >
> > I am using the Window feature because I need state and I need to expire
> (and clean) this state after a timeout (I use the onEventTime to do that).
> > Because I need the data streaming in real time (augmented with the
> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
> window I need the fact of this purge (and some of the stats of this Window)
> as a separate event in a separate 'DataStream'.
> >
> > Now the interfaces of the various classes only support output as a
> single java type (very sane choice).
> > So what I do right now is put my events on something 'external'
> (HBase/Kafka) and read it in via a different Source implementation.
> >
> > My question: Is there a better way to do this?
> > Can I (for example) create a special 'Source' that I can pass as a
> parameter to my Trigger and then onEventTime just output a 'new event' ?
> >
> > What do you recommend?
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
> >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Getting two types of events from a Window (Trigger)?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Niels,
I’m afraid this will not work. (If I understood correctly what you are trying to do.) When the trigger is being serialized/deserialized each parallel instance of the trigger has their own copy of the QueueSource object. Plus, a separate instance of the QueueSource itself will be running in each parallel instance of the source operator. And there is no way for there being communication between the trigger and source, since they might now even run on the same machine in the end.

Cheers,
Aljoscha
> On 11 Dec 2015, at 13:11, Niels Basjes <Ni...@basjes.nl> wrote:
> 
> Hi,
> 
> Just to let you know: I tried passing a SourceFunction but I haven't been able to get that to work (yet).
> 
> I passed an instance of this (see code below) into my Trigger and stored it there as:
>     private QueueSource output;
> and then I called from the onElement something like:
>    output.put("Foo",1234);
> 
> When I run this from my IDE I get two distinct instances of the queue (effect: the stuff I put in doesn't come out at the other end).
> 
> Any pointers how (and if) this can be fixed are welcome.
> Only if this works will I look into making this a generic (I got some type related exceptions when I tried that).
> 
> Niels
> 
> 
> (Below has Apache 2.0 License; so copy adapt and improve if you want to)
> package nl.basjes.flink.experiments;
> 
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
> 
> import java.util.concurrent.ConcurrentLinkedQueue;
> 
> public class QueueSource extends RichEventTimeSourceFunction<String> {
>     private static final long serialVersionUID = 1L;
> 
>     private volatile boolean isRunning = true;
> 
>     private ConcurrentLinkedQueue<QueueElement> queue = new ConcurrentLinkedQueue<>();
> 
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>     }
> 
>     @Override
>     public void close() throws Exception {
>         super.close();
>     }
> 
>     @Override
>     public void run(SourceContext<String> ctx) throws Exception {
>         this.isRunning = true;
> 
>         while (this.isRunning) {
>             if (queue.isEmpty()) {
>                 Thread.sleep(1); // Sleep 1 ms before retrying to dequeue again
>                 continue;
>             }
>             QueueElement queueElement = queue.poll();
>             ctx.collectWithTimestamp(queueElement.element, queueElement.timestamp);
>         }
>     }
> 
>     public void cancel() {
>         this.isRunning = false;
>     }
> 
>     public void put(String element, long timestamp) {
>         QueueElement queueElement = new QueueElement();
>         queueElement.element = element;
>         queueElement.timestamp = timestamp;
>         queue.add(queueElement);
>     }
> }
> 
> class QueueElement {
>     String element;
>     long timestamp;
> }
> 
> 
> 
> On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> Thanks.
> 
> The way I solved it now is by creating a class that persists data into something external (right now HBase and/or Kafka) and use that from the trigger to output the data.
> 
> I have two followup questions:
> 1) Is it possible to pass an instance of  'SourceFunction' as such a parameter (without breaking Flink)?
> 2) I want to save resources so I'm using a single instance of my 'Extra data output class' in the instance of the Trigger. Thus reusing the connections to the outside over multiple Window instances. Can I assume that a single instance of Trigger will only be used by a single thread? I.e. Can I assume that I do not need locking and synchronization?
> 
> Niels
> 
> On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi Niels!
> 
> I think there is no clean way to emit data from a trigger right now, you can only emit data from the window functions.
> 
> You can emit two different kind of data types using an "Either" type. This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
> 
> Maybe being able to emit different type of elements helps your use case...
> 
> 
> These types of questions have been coming up quite a bit, people looking to do different actions inside the windows on different triggers (on element, on event time).
> 
> As per discussion with Aljoscha, one way to make this more flexible is to enhance what you can do with custom state:
>   - State has timeouts (for cleanup)
>   - Functions allow you to schedule event-time progress notifications
> 
> Stephan
> 
> 
> 
> On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> Hi,
> 
> I'm working on something that uses the Flink Window feature.
> I have written a custom Trigger to build the Window I need.
> 
> I am using the Window feature because I need state and I need to expire (and clean) this state after a timeout (I use the onEventTime to do that).
> Because I need the data streaming in real time (augmented with the mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the window I need the fact of this purge (and some of the stats of this Window) as a separate event in a separate 'DataStream'.
> 
> Now the interfaces of the various classes only support output as a single java type (very sane choice).
> So what I do right now is put my events on something 'external' (HBase/Kafka) and read it in via a different Source implementation.
> 
> My question: Is there a better way to do this?
> Can I (for example) create a special 'Source' that I can pass as a parameter to my Trigger and then onEventTime just output a 'new event' ?
> 
> What do you recommend?
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes


Re: Getting two types of events from a Window (Trigger)?

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi,

Just to let you know: I tried passing a SourceFunction but I haven't been
able to get that to work (yet).

I passed an instance of this (see code below) into my Trigger and stored it
there as:
    private QueueSource output;
and then I called from the onElement something like:
   output.put("Foo",1234);

When I run this from my IDE I get two distinct instances of the queue
(effect: the stuff I put in doesn't come out at the other end).

Any pointers how (and if) this can be fixed are welcome.
Only if this works will I look into making this a generic (I got some type
related exceptions when I tried that).

Niels


(Below has Apache 2.0 License; so copy adapt and improve if you want to)

package nl.basjes.flink.experiments;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;

import java.util.concurrent.ConcurrentLinkedQueue;

public class QueueSource extends RichEventTimeSourceFunction<String> {
    private static final long serialVersionUID = 1L;

    private volatile boolean isRunning = true;

    private ConcurrentLinkedQueue<QueueElement> queue = new
ConcurrentLinkedQueue<>();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        this.isRunning = true;

        while (this.isRunning) {
            if (queue.isEmpty()) {
                Thread.sleep(1); // Sleep 1 ms before retrying to dequeue again
                continue;
            }
            QueueElement queueElement = queue.poll();
            ctx.collectWithTimestamp(queueElement.element,
queueElement.timestamp);
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public void put(String element, long timestamp) {
        QueueElement queueElement = new QueueElement();
        queueElement.element = element;
        queueElement.timestamp = timestamp;
        queue.add(queueElement);
    }
}

class QueueElement {
    String element;
    long timestamp;
}




On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <Ni...@basjes.nl> wrote:

> Thanks.
>
> The way I solved it now is by creating a class that persists data into
> something external (right now HBase and/or Kafka) and use that from the
> trigger to output the data.
>
> I have two followup questions:
> 1) Is it possible to pass an instance of  'SourceFunction' as such a
> parameter (without breaking Flink)?
> 2) I want to save resources so I'm using a single instance of my 'Extra
> data output class' in the instance of the Trigger. Thus reusing the
> connections to the outside over multiple Window instances. Can I assume
> that a single instance of Trigger will only be used by a single thread?
> I.e. Can I assume that I do not need locking and synchronization?
>
> Niels
>
> On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Niels!
>>
>> I think there is no clean way to emit data from a trigger right now, you
>> can only emit data from the window functions.
>>
>> You can emit two different kind of data types using an "Either" type.
>> This is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
>>
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
>>
>> Maybe being able to emit different type of elements helps your use case...
>>
>>
>> These types of questions have been coming up quite a bit, people looking
>> to do different actions inside the windows on different triggers (on
>> element, on event time).
>>
>> As per discussion with Aljoscha, one way to make this more flexible is to
>> enhance what you can do with custom state:
>>   - State has timeouts (for cleanup)
>>   - Functions allow you to schedule event-time progress notifications
>>
>> Stephan
>>
>>
>>
>> On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> I'm working on something that uses the Flink Window feature.
>>> I have written a custom Trigger to build the Window I need.
>>>
>>> I am using the Window feature because I need state and I need to expire
>>> (and clean) this state after a timeout (I use the onEventTime to do that).
>>> Because I need the data streaming in real time (augmented with the
>>> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
>>> window I need the fact of this purge (and some of the stats of this Window)
>>> as a separate event in a separate 'DataStream'.
>>>
>>> Now the interfaces of the various classes only support output as a
>>> single java type (very sane choice).
>>> So what I do right now is put my events on something 'external'
>>> (HBase/Kafka) and read it in via a different Source implementation.
>>>
>>> My question: Is there a better way to do this?
>>> Can I (for example) create a special 'Source' that I can pass as a
>>> parameter to my Trigger and then onEventTime just output a 'new event' ?
>>>
>>> What do you recommend?
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>



-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Getting two types of events from a Window (Trigger)?

Posted by Niels Basjes <Ni...@basjes.nl>.
Thanks.

The way I solved it now is by creating a class that persists data into
something external (right now HBase and/or Kafka) and use that from the
trigger to output the data.

I have two followup questions:
1) Is it possible to pass an instance of  'SourceFunction' as such a
parameter (without breaking Flink)?
2) I want to save resources so I'm using a single instance of my 'Extra
data output class' in the instance of the Trigger. Thus reusing the
connections to the outside over multiple Window instances. Can I assume
that a single instance of Trigger will only be used by a single thread?
I.e. Can I assume that I do not need locking and synchronization?

Niels

On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Niels!
>
> I think there is no clean way to emit data from a trigger right now, you
> can only emit data from the window functions.
>
> You can emit two different kind of data types using an "Either" type. This
> is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
>
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
>
> Maybe being able to emit different type of elements helps your use case...
>
>
> These types of questions have been coming up quite a bit, people looking
> to do different actions inside the windows on different triggers (on
> element, on event time).
>
> As per discussion with Aljoscha, one way to make this more flexible is to
> enhance what you can do with custom state:
>   - State has timeouts (for cleanup)
>   - Functions allow you to schedule event-time progress notifications
>
> Stephan
>
>
>
> On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>
>> Hi,
>>
>> I'm working on something that uses the Flink Window feature.
>> I have written a custom Trigger to build the Window I need.
>>
>> I am using the Window feature because I need state and I need to expire
>> (and clean) this state after a timeout (I use the onEventTime to do that).
>> Because I need the data streaming in real time (augmented with the
>> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
>> window I need the fact of this purge (and some of the stats of this Window)
>> as a separate event in a separate 'DataStream'.
>>
>> Now the interfaces of the various classes only support output as a single
>> java type (very sane choice).
>> So what I do right now is put my events on something 'external'
>> (HBase/Kafka) and read it in via a different Source implementation.
>>
>> My question: Is there a better way to do this?
>> Can I (for example) create a special 'Source' that I can pass as a
>> parameter to my Trigger and then onEventTime just output a 'new event' ?
>>
>> What do you recommend?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Getting two types of events from a Window (Trigger)?

Posted by Stephan Ewen <se...@apache.org>.
Hi Niels!

I think there is no clean way to emit data from a trigger right now, you
can only emit data from the window functions.

You can emit two different kind of data types using an "Either" type. This
is built-in in Scala, in Java we added it on 1.0-SNAPSHOT:
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java

Maybe being able to emit different type of elements helps your use case...


These types of questions have been coming up quite a bit, people looking to
do different actions inside the windows on different triggers (on element,
on event time).

As per discussion with Aljoscha, one way to make this more flexible is to
enhance what you can do with custom state:
  - State has timeouts (for cleanup)
  - Functions allow you to schedule event-time progress notifications

Stephan



On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> I'm working on something that uses the Flink Window feature.
> I have written a custom Trigger to build the Window I need.
>
> I am using the Window feature because I need state and I need to expire
> (and clean) this state after a timeout (I use the onEventTime to do that).
> Because I need the data streaming in real time (augmented with the
> mentioned state) I 'FIRE' after every event. Just before I 'PURGE' the
> window I need the fact of this purge (and some of the stats of this Window)
> as a separate event in a separate 'DataStream'.
>
> Now the interfaces of the various classes only support output as a single
> java type (very sane choice).
> So what I do right now is put my events on something 'external'
> (HBase/Kafka) and read it in via a different Source implementation.
>
> My question: Is there a better way to do this?
> Can I (for example) create a special 'Source' that I can pass as a
> parameter to my Trigger and then onEventTime just output a 'new event' ?
>
> What do you recommend?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>