You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Samim Ahmed <sa...@gmail.com> on 2017/05/22 07:06:24 UTC

Need help on Streaming API | Flink | GlobalWindow and Customized Trigger

Hello All,


Hope you are doing well..



Myself Samim and I am working of POC(proof of concept) for a project. In
this project we are using Apache Flink to process the stream data and find
the required pattern and finally dump those patterns in DB.



So to implement this we have used the global window and customized trigger
to done our work.

While testing we observed that output is coming as expected but we are
loosing the data for few minutes when the Stream ends at input.



For example If the data streaming stared at 1pm and it ends at 5pm on the
same day and in out put we found the data is missing for the time 4:55pm to
5 pm. Also we observed when the input data stream finishes immediately the
entire process stops and the last few minutes data are remains inside the
window.



We need your help here to overcome this last minutes data missing issue as
I am new to this flink framework. Do we have any API available to solve
this problem or it is the Flink limitation?



It’ll be great if you share your views and do let me know if you need any
further information.



I am waiting for your inputs, Thanks in advance.



Thanks,

Samim.

Re: Need help on Streaming API | Flink | GlobalWindow and Customized Trigger

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I think the problem is that the Trigger only uses processing time to determine when to trigger. If the job shuts down (which happens when the sources shut down and the whole pipeline is flushed) pending processing-time triggers are not fired.

You can use the fact that when sources shutdown they emit a Long.MAX_VALUE watermark to signal that there won’t be any data in the future. For this, you have to enable watermarks (you can do this via env.setStreamTimeCharacteristic(EventTime)) and then set an event-time timer in your Trigger for Long.MAX_VALUE. This will call your onEventTime() method and allow the Trigger to fire before shutting down.

Best,
Aljoscha

> On 24. May 2017, at 07:08, Samim Ahmed <sa...@gmail.com> wrote:
> 
> Hello Aljoscha,
> 
> Sorry !!! , I am asking for updates on my issue. 
> Did you get a chance to have a look on the trigger files, I am blocked. Please have a look if you have time in your hand. 
> Waiting for your answer and again sorry for this mail.
> 
> Regards,
> Samim 
> 
> On Tue, May 23, 2017 at 1:00 AM, Samim Ahmed <samim1216@gmail.com <ma...@gmail.com>> wrote:
> Hello Aljoscha,
> 
> Thanks for have look on this issue.
> 
> I have coppied the mail code for the flow of execution and the trigger code are attached with this mail.
> In main class:
> 
> AggregationProfile vtrAggProfile = new VtrAggregationProfile();
> 1. decode the input file
> 2. Filter Events
> 	
> 		DataStream<EventBean> vtrFilteredStream = vtrDecodedEventsDs.filter(vtrAggProfile).setParallelism(vtrParserParallelism);
> 
> 3. Correlate VTR Records 
> 		
> 		DataStream<ISession> vtrSessionStream=vtrFilteredStream
> 		.keyBy(vtrAggProfile)
> 		.window(GlobalWindows.create())
> 		.trigger(vtrAggProfile)  <=== this is the trigger creates problem for last few minutes data.
> 		.apply(vtrAggProfile).setParallelism(maxParallelism);
> 
> Attached file names :
> 1. VerAggregationProfile.java
> 2. AggregationProfile.java
> 
> 
> Please let me know if you need any other information. Thanks in advance .
> 
> 
> //Regards,
> Samim
> 
> On Mon, May 22, 2017 at 6:30 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> 
> If your could give us a look at your custom Trigger we might be able to figure out what’s going on.
> 
> Best,
> Aljoscha
> 
>> On 22. May 2017, at 09:06, Samim Ahmed <samim1216@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hello All,
>> 
>> Hope you are doing well..
>>  
>> Myself Samim and I am working of POC(proof of concept) for a project. In this project we are using Apache Flink to process the stream data and find the required pattern and finally dump those patterns in DB.
>>  
>> So to implement this we have used the global window and customized trigger to done our work.
>> While testing we observed that output is coming as expected but we are loosing the data for few minutes when the Stream ends at input.
>>  
>> For example If the data streaming stared at 1pm and it ends at 5pm on the same day and in out put we found the data is missing for the time 4:55pm to 5 pm. Also we observed when the input data stream finishes immediately the entire process stops and the last few minutes data are remains inside the window.
>>  
>> We need your help here to overcome this last minutes data missing issue as I am new to this flink framework. Do we have any API available to solve this problem or it is the Flink limitation?
>>  
>> It’ll be great if you share your views and do let me know if you need any further information.
>>  
>> I am waiting for your inputs, Thanks in advance.
>>  
>> Thanks,
>> Samim.
> 
> 
> 
> 
> -- 
> Regards,
> Samim Ahmed 
> Mumbai
> 09004259232 <tel:090042%2059232>
> 
> 
> 
> 
> -- 
> Regards,
> Samim Ahmed 
> Mumbai
> 09004259232


Re: Need help on Streaming API | Flink | GlobalWindow and Customized Trigger

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

If your could give us a look at your custom Trigger we might be able to figure out what’s going on.

Best,
Aljoscha
> On 22. May 2017, at 09:06, Samim Ahmed <sa...@gmail.com> wrote:
> 
> Hello All,
> 
> Hope you are doing well..
>  
> Myself Samim and I am working of POC(proof of concept) for a project. In this project we are using Apache Flink to process the stream data and find the required pattern and finally dump those patterns in DB.
>  
> So to implement this we have used the global window and customized trigger to done our work.
> While testing we observed that output is coming as expected but we are loosing the data for few minutes when the Stream ends at input.
>  
> For example If the data streaming stared at 1pm and it ends at 5pm on the same day and in out put we found the data is missing for the time 4:55pm to 5 pm. Also we observed when the input data stream finishes immediately the entire process stops and the last few minutes data are remains inside the window.
>  
> We need your help here to overcome this last minutes data missing issue as I am new to this flink framework. Do we have any API available to solve this problem or it is the Flink limitation?
>  
> It’ll be great if you share your views and do let me know if you need any further information.
>  
> I am waiting for your inputs, Thanks in advance.
>  
> Thanks,
> Samim.