You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sujit Sakre <su...@northgateps.com> on 2017/01/09 16:57:15 UTC

Sliding Event Time Window Processing: Window Function inconsistent behavior

Hi,

We are using Sliding Event Time Window with Kafka Consumer. The window size
is 6 minutes, and slide is 2 minutes. We have written a window function to
select a particular window out of multiple windows for a keyed stream, e.g.
we select about 16 windows out of multiple windows for the keyed stream
based on a particular condition.

Upon a normal execution, we get 16 windows for processing inside the
condition (in window function mentioned). These windows we are putting in
different files, named after window start and end times.

the code is as below:

Calling code


public class RealTimeProcessingSlidingWindow{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", DEMO_GROUP);
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float,
String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
kafkaProps);
                DataStream<Tuple5<String, String, Float, Float, String>>
stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>>
keyedStream = stream1.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessGenerator2());

                keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   // 6
min window with 2 min sliding window
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");

           }
}


public static class CustomSlidingWindowFunction implements
WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
String, Float, Float, String>, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws
Exception {

HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap=
new HashMap<String,Tuple5<String, String, Float, Float, String>>();
for (Tuple5<String, String, Float, Float, String> wr: input){
windowMap.put(wr.f1.toString().trim(), wr);
}

                        ...

SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);

if(windowMap.containsKey(tk)){
Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String,
Float, Float, String>) windowMap.get(tk);

Date d = sf.parse(t.f0.trim());

                                ...

                                // Condition for selecting a window
if(d.after(x) && d.before(y)){
// Write the window output to separate files named after window Lat and Lon
writeWindowToFile(t, window, input);
                                    }
                         }
                }
        }

// Get the buffered writer
private static synchronized BufferedWriter getWriter(String fileName)
throws IOException{
return new BufferedWriter(new FileWriter(fileName, true));
}
// Writes an entire window to file for the records in that window
private static synchronized void writeWindowToFile(Tuple5<String, String,
Float, Float, String> target, TimeWindow window, Iterable<Tuple5<String,
String, Float, Float, String>> input) throws IOException{
// Create a file to write a window to
String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv";
BufferedWriter br = getWriter(fileName);

// Iterate and put the records in file
for (Tuple5<String, String, Float, Float, String> tr: input){
br.write(tr.f1.toString().trim()+", "+
convertLongIntoDate(window.getStart())+",
"+convertLongIntoDate(window.getEnd())+",
"+
tr.f0+", "+tr.f2+", "+tr.f3+'\n');
}
// flush the writer and close it
br.close();
}

We have written the code to be threadsafe while creating and writing to file

In this code, If we execute the code multiple times on the Kafka Stream
(with certain records) most times we get 16 files with corresponding window
records, which is the correct behavior.

However sometimes only 4 files get created or 1 file or any number less
than 16 gets created randomly, this is anomalous behavior.

What could be the cause of such behavior? How do we resolve this?

Please, could you identify and suggest a solution/s.

Thanks.


*Sujit Sakre*

-- 
This email is sent on behalf of Northgate Public Services (UK) Limited and 
its associated companies including Rave Technologies (India) Pvt Limited 
(together "Northgate Public Services") and is strictly confidential and 
intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not 
disclose, copy or distribute its contents to any other person nor use its 
contents in any way or you may be acting unlawfully;  (ii) contact 
Northgate Public Services immediately on +44(0)1908 264500 quoting the name 
of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that 
no viruses are contained in this email, but does not accept any 
responsibility once this email has been transmitted.  You should scan 
attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales 
under number 00968498 with a registered address of Peoplebuilding 2, 
Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 
4NN.  Rave Technologies (India) Pvt Limited, registered in India under 
number 117068 with a registered address of 2nd Floor, Ballard House, Adi 
Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.

Re: Sliding Event Time Window Processing: Window Function inconsistent behavior

Posted by Aljoscha Krettek <al...@apache.org>.
Great! Thanks for letting us know.

On Wed, 11 Jan 2017 at 12:44 Sujit Sakre <su...@northgateps.com>
wrote:

> Hi Aljoscha,
>
> I have realized that the output stream is not defined separately in the
> code below, and hence the input values are getting in the sink. After
> defining a separate output stream it works.
>
> We have now confirmed that the windows are processed separately as per the
> groupings.
>
> Thanks.
>
>
> *Sujit Sakre*
>
>
> On 10 January 2017 at 22:10, Sujit Sakre <su...@northgateps.com>
> wrote:
>
> Hi Aljoscha,
>
> Thanks.
>
> I have used the following code for testing:
>
> main
>
> keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
> keyedStream.addSink(new SinkFunction<Tuple5<String, String, Float, Float,
> String>>() {
> /**
> *
> */
> private static final long serialVersionUID = 1L;
>
> public void invoke(Tuple5<String, String, Float, Float, String> value) {
> System.out.println(value.f1.toString().trim()+", " +
> value.f0 + ", "+value.f2 + ", " + value.f3);
> }
> });
>
>
> in WindowFunction apply
>
> ...
>
>                                 // Condition for selecting a window
> if(d.after(x) && d.before(y)){
>
>                                         for (Tuple5<String, String, Float,
> Float, String> tr: input){
> // Write the window to Collector
> out.collect(new Tuple5<>(tr.f0, tr.f1, tr.f2, tr.f3, tr.f4));
>                                     }
>
> I am getting all input records instead of those windows selected by
> the condition. Is there something I am doing wrong? Does this need to be
> done in a different way?
>
> Please let me know.
>
> Thanks.
>
>
>
> *Sujit Sakre*
>
>
> On 10 January 2017 at 20:24, Aljoscha Krettek <al...@apache.org> wrote:
>
> Hi,
> instead of writing to files, could you please simply output a value using
> the Collector and then write the result stream of the window operation to a
> sink (such as a file sink) to see how many windows are being processed.
> Having side effects (especially output) in user functions can lead to
> programs with quite unexpected behaviour and I would highly discourage
> doing that.
>
> Cheers,
> Aljoscha
>
> On Tue, 10 Jan 2017 at 13:44 Sujit Sakre <su...@northgateps.com>
> wrote:
>
> Hi,
>
> In the link (
> http://stackoverflow.com/questions/41143518/sliding-processing-time-window-computes-inconsistent-results),
> Fabian has mentioned that if Event Time is used, consistent results are
> possible.
>
> However, that's not the case with us. We are getting very random results.
>
> Please suggest.
>
>
> *Sujit Sakre*
>
>
> On 9 January 2017 at 22:27, Sujit Sakre <su...@northgateps.com>
> wrote:
>
> Hi,
>
> We are using Sliding Event Time Window with Kafka Consumer. The window
> size is 6 minutes, and slide is 2 minutes. We have written a window
> function to select a particular window out of multiple windows for a keyed
> stream, e.g. we select about 16 windows out of multiple windows for the
> keyed stream based on a particular condition.
>
> Upon a normal execution, we get 16 windows for processing inside the
> condition (in window function mentioned). These windows we are putting in
> different files, named after window start and end times.
>
> the code is as below:
>
> Calling code
>
>
> public class RealTimeProcessingSlidingWindow{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
> kafkaProps.setProperty("group.id", DEMO_GROUP);
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
>                 FlinkKafkaConsumer09<Tuple5<String, String, Float, Float,
> String>> consumer = new FlinkKafkaConsumer09<>(
> "test",            // kafka topic name
> new dataSchema(),
> kafkaProps);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> stream1 = env.addSource(consumer);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
>                 keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
>                 env.execute("Sliding Event Time Window Processing");
>
>            }
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
> String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input,
> Collector<Tuple5<String, String, Float, Float, String>> out) throws
> Exception {
>
> HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap=
> new HashMap<String,Tuple5<String, String, Float, Float, String>>();
> for (Tuple5<String, String, Float, Float, String> wr: input){
> windowMap.put(wr.f1.toString().trim(), wr);
> }
>
>                         ...
>
> SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);
>
> if(windowMap.containsKey(tk)){
> Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String,
> Float, Float, String>) windowMap.get(tk);
>
> Date d = sf.parse(t.f0.trim());
>
>                                 ...
>
>                                 // Condition for selecting a window
> if(d.after(x) && d.before(y)){
> // Write the window output to separate files named after window Lat and Lon
> writeWindowToFile(t, window, input);
>                                     }
>                          }
>                 }
>         }
>
> // Get the buffered writer
> private static synchronized BufferedWriter getWriter(String fileName)
> throws IOException{
> return new BufferedWriter(new FileWriter(fileName, true));
> }
> // Writes an entire window to file for the records in that window
> private static synchronized void writeWindowToFile(Tuple5<String, String,
> Float, Float, String> target, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input) throws IOException{
> // Create a file to write a window to
> String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv";
> BufferedWriter br = getWriter(fileName);
>
> // Iterate and put the records in file
> for (Tuple5<String, String, Float, Float, String> tr: input){
> br.write(tr.f1.toString().trim()+", "+
> convertLongIntoDate(window.getStart())+",
> "+convertLongIntoDate(window.getEnd())+", "+
> tr.f0+", "+tr.f2+", "+tr.f3+'\n');
> }
> // flush the writer and close it
> br.close();
> }
>
> We have written the code to be threadsafe while creating and writing to
> file
>
> In this code, If we execute the code multiple times on the Kafka Stream
> (with certain records) most times we get 16 files with corresponding window
> records, which is the correct behavior.
>
> However sometimes only 4 files get created or 1 file or any number less
> than 16 gets created randomly, this is anomalous behavior.
>
> What could be the cause of such behavior? How do we resolve this?
>
> Please, could you identify and suggest a solution/s.
>
> Thanks.
>
>
> *Sujit Sakre*
>
>
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>
>
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>

Re: Sliding Event Time Window Processing: Window Function inconsistent behavior

Posted by Sujit Sakre <su...@northgateps.com>.
Hi Aljoscha,

I have realized that the output stream is not defined separately in the
code below, and hence the input values are getting in the sink. After
defining a separate output stream it works.

We have now confirmed that the windows are processed separately as per the
groupings.

Thanks.


*Sujit Sakre*


On 10 January 2017 at 22:10, Sujit Sakre <su...@northgateps.com>
wrote:

> Hi Aljoscha,
>
> Thanks.
>
> I have used the following code for testing:
>
> main
>
> keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
> keyedStream.addSink(new SinkFunction<Tuple5<String, String, Float, Float,
> String>>() {
> /**
> *
> */
> private static final long serialVersionUID = 1L;
>
> public void invoke(Tuple5<String, String, Float, Float, String> value) {
> System.out.println(value.f1.toString().trim()+", " +
> value.f0 + ", "+value.f2 + ", " + value.f3);
> }
> });
>
>
> in WindowFunction apply
>
> ...
>
>                                 // Condition for selecting a window
> if(d.after(x) && d.before(y)){
>
>                                         for (Tuple5<String, String, Float,
> Float, String> tr: input){
> // Write the window to Collector
> out.collect(new Tuple5<>(tr.f0, tr.f1, tr.f2, tr.f3, tr.f4));
>                                     }
>
> I am getting all input records instead of those windows selected by
> the condition. Is there something I am doing wrong? Does this need to be
> done in a different way?
>
> Please let me know.
>
> Thanks.
>
>
>
> *Sujit Sakre*
>
>
> On 10 January 2017 at 20:24, Aljoscha Krettek <al...@apache.org> wrote:
>
>> Hi,
>> instead of writing to files, could you please simply output a value using
>> the Collector and then write the result stream of the window operation to a
>> sink (such as a file sink) to see how many windows are being processed.
>> Having side effects (especially output) in user functions can lead to
>> programs with quite unexpected behaviour and I would highly discourage
>> doing that.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 10 Jan 2017 at 13:44 Sujit Sakre <su...@northgateps.com>
>> wrote:
>>
>>> Hi,
>>>
>>> In the link (http://stackoverflow.com/questions/41143518/sliding-process
>>> ing-time-window-computes-inconsistent-results), Fabian has mentioned
>>> that if Event Time is used, consistent results are possible.
>>>
>>> However, that's not the case with us. We are getting very random results.
>>>
>>> Please suggest.
>>>
>>>
>>> *Sujit Sakre*
>>>
>>>
>>> On 9 January 2017 at 22:27, Sujit Sakre <su...@northgateps.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> We are using Sliding Event Time Window with Kafka Consumer. The window
>>> size is 6 minutes, and slide is 2 minutes. We have written a window
>>> function to select a particular window out of multiple windows for a keyed
>>> stream, e.g. we select about 16 windows out of multiple windows for the
>>> keyed stream based on a particular condition.
>>>
>>> Upon a normal execution, we get 16 windows for processing inside the
>>> condition (in window function mentioned). These windows we are putting in
>>> different files, named after window start and end times.
>>>
>>> the code is as below:
>>>
>>> Calling code
>>>
>>>
>>> public class RealTimeProcessingSlidingWindow{
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> // set up the execution environment
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> // configure the Kafka consumer
>>> Properties kafkaProps = new Properties();
>>> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
>>> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
>>> kafkaProps.setProperty("group.id", DEMO_GROUP);
>>> // always read the Kafka topic from the start
>>> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>>>
>>>                 FlinkKafkaConsumer09<Tuple5<String, String, Float,
>>> Float, String>> consumer = new FlinkKafkaConsumer09<>(
>>> "test",            // kafka topic name
>>> new dataSchema(),
>>> kafkaProps);
>>>                 DataStream<Tuple5<String, String, Float, Float, String>>
>>> stream1 = env.addSource(consumer);
>>>                 DataStream<Tuple5<String, String, Float, Float, String>>
>>> keyedStream = stream1.assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessGenerator2());
>>>
>>>                 keyedStream.keyBy(4)
>>> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))
>>> // 6 min window with 2 min sliding window
>>> .apply(new CustomSlidingWindowFunction());
>>>
>>>                 env.execute("Sliding Event Time Window Processing");
>>>
>>>            }
>>> }
>>>
>>>
>>> public static class CustomSlidingWindowFunction implements
>>> WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
>>> String, Float, Float, String>, Tuple, TimeWindow>{
>>>
>>> @Override
>>> public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
>>> String, Float, Float, String>> input,
>>> Collector<Tuple5<String, String, Float, Float, String>> out) throws
>>> Exception {
>>>
>>> HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap=
>>> new HashMap<String,Tuple5<String, String, Float, Float, String>>();
>>> for (Tuple5<String, String, Float, Float, String> wr: input){
>>> windowMap.put(wr.f1.toString().trim(), wr);
>>> }
>>>
>>>                         ...
>>>
>>> SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);
>>>
>>> if(windowMap.containsKey(tk)){
>>> Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String,
>>> Float, Float, String>) windowMap.get(tk);
>>>
>>> Date d = sf.parse(t.f0.trim());
>>>
>>>                                 ...
>>>
>>>                                 // Condition for selecting a window
>>> if(d.after(x) && d.before(y)){
>>> // Write the window output to separate files named after window Lat and
>>> Lon
>>> writeWindowToFile(t, window, input);
>>>                                     }
>>>                          }
>>>                 }
>>>         }
>>>
>>> // Get the buffered writer
>>> private static synchronized BufferedWriter getWriter(String fileName)
>>> throws IOException{
>>> return new BufferedWriter(new FileWriter(fileName, true));
>>> }
>>> // Writes an entire window to file for the records in that window
>>> private static synchronized void writeWindowToFile(Tuple5<String,
>>> String, Float, Float, String> target, TimeWindow window,
>>> Iterable<Tuple5<String, String, Float, Float, String>> input) throws
>>> IOException{
>>> // Create a file to write a window to
>>> String fileName = target.f2.toString() + "-" +
>>> target.f3.toString()+".csv";
>>> BufferedWriter br = getWriter(fileName);
>>>
>>> // Iterate and put the records in file
>>> for (Tuple5<String, String, Float, Float, String> tr: input){
>>> br.write(tr.f1.toString().trim()+", "+
>>> convertLongIntoDate(window.getStart())+", "+convertLongIntoDate(window.getEnd())+",
>>> "+
>>> tr.f0+", "+tr.f2+", "+tr.f3+'\n');
>>> }
>>> // flush the writer and close it
>>> br.close();
>>> }
>>>
>>> We have written the code to be threadsafe while creating and writing to
>>> file
>>>
>>> In this code, If we execute the code multiple times on the Kafka Stream
>>> (with certain records) most times we get 16 files with corresponding window
>>> records, which is the correct behavior.
>>>
>>> However sometimes only 4 files get created or 1 file or any number less
>>> than 16 gets created randomly, this is anomalous behavior.
>>>
>>> What could be the cause of such behavior? How do we resolve this?
>>>
>>> Please, could you identify and suggest a solution/s.
>>>
>>> Thanks.
>>>
>>>
>>> *Sujit Sakre*
>>>
>>>
>>>
>>> This email is sent on behalf of Northgate Public Services (UK) Limited
>>> and its associated companies including Rave Technologies (India) Pvt
>>> Limited (together "Northgate Public Services") and is strictly confidential
>>> and intended solely for the addressee(s).
>>> If you are not the intended recipient of this email you must: (i) not
>>> disclose, copy or distribute its contents to any other person nor use its
>>> contents in any way or you may be acting unlawfully;  (ii) contact
>>> Northgate Public Services immediately on +44(0)1908 264500
>>> <+44%201908%20264500> quoting the name of the sender and the addressee
>>> then delete it from your system.
>>> Northgate Public Services has taken reasonable precautions to ensure
>>> that no viruses are contained in this email, but does not accept any
>>> responsibility once this email has been transmitted.  You should scan
>>> attachments (if any) for viruses.
>>>
>>> Northgate Public Services (UK) Limited, registered in England and Wales
>>> under number 00968498 with a registered address of Peoplebuilding 2,
>>> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
>>> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
>>> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
>>> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>>>
>>
>

-- 
This email is sent on behalf of Northgate Public Services (UK) Limited and 
its associated companies including Rave Technologies (India) Pvt Limited 
(together "Northgate Public Services") and is strictly confidential and 
intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not 
disclose, copy or distribute its contents to any other person nor use its 
contents in any way or you may be acting unlawfully;  (ii) contact 
Northgate Public Services immediately on +44(0)1908 264500 quoting the name 
of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that 
no viruses are contained in this email, but does not accept any 
responsibility once this email has been transmitted.  You should scan 
attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales 
under number 00968498 with a registered address of Peoplebuilding 2, 
Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 
4NN.  Rave Technologies (India) Pvt Limited, registered in India under 
number 117068 with a registered address of 2nd Floor, Ballard House, Adi 
Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.

Re: Sliding Event Time Window Processing: Window Function inconsistent behavior

Posted by Sujit Sakre <su...@northgateps.com>.
Hi Aljoscha,

Thanks.

I have used the following code for testing:

main

keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   // 6
min window with 2 min sliding window
.apply(new CustomSlidingWindowFunction());

keyedStream.addSink(new SinkFunction<Tuple5<String, String, Float, Float,
String>>() {
/**
*
*/
private static final long serialVersionUID = 1L;

public void invoke(Tuple5<String, String, Float, Float, String> value) {
System.out.println(value.f1.toString().trim()+", " +
value.f0 + ", "+value.f2 + ", " + value.f3);
}
});


in WindowFunction apply

...

                                // Condition for selecting a window
if(d.after(x) && d.before(y)){

                                        for (Tuple5<String, String, Float,
Float, String> tr: input){
// Write the window to Collector
out.collect(new Tuple5<>(tr.f0, tr.f1, tr.f2, tr.f3, tr.f4));
                                    }

I am getting all input records instead of those windows selected by
the condition. Is there something I am doing wrong? Does this need to be
done in a different way?

Please let me know.

Thanks.



*Sujit Sakre*

Senior Technical Architect
Tel: +91 22 6660 6600
Ext:
247
Direct: 6740 5247

Mobile: +91 98672 01204

www.rave-tech.com



Follow us on: Twitter <https://twitter.com/Rave_Tech> / LinkedIn
<https://in.linkedin.com/in/ravetechnologies> / YouTube
<https://www.youtube.com/channel/UCTaO1am-cm4FqnQCGdB6ExA>



Rave Technologies – A Northgate Public Services Company
<https://www.google.co.in/maps/place/Rave+Technologies/@19.0058078,72.823516,17z/data=!3m1!4b1!4m5!3m4!1s0x3bae17fcde71c3b9:0x1e2a8c0c4a075145!8m2!3d19.0058078!4d72.8257047>



Please consider the environment before printing this email

On 10 January 2017 at 20:24, Aljoscha Krettek <al...@apache.org> wrote:

> Hi,
> instead of writing to files, could you please simply output a value using
> the Collector and then write the result stream of the window operation to a
> sink (such as a file sink) to see how many windows are being processed.
> Having side effects (especially output) in user functions can lead to
> programs with quite unexpected behaviour and I would highly discourage
> doing that.
>
> Cheers,
> Aljoscha
>
> On Tue, 10 Jan 2017 at 13:44 Sujit Sakre <su...@northgateps.com>
> wrote:
>
>> Hi,
>>
>> In the link (http://stackoverflow.com/questions/41143518/sliding-
>> processing-time-window-computes-inconsistent-results), Fabian has
>> mentioned that if Event Time is used, consistent results are possible.
>>
>> However, that's not the case with us. We are getting very random results.
>>
>> Please suggest.
>>
>>
>> *Sujit Sakre*
>>
>>
>> On 9 January 2017 at 22:27, Sujit Sakre <su...@northgateps.com>
>> wrote:
>>
>> Hi,
>>
>> We are using Sliding Event Time Window with Kafka Consumer. The window
>> size is 6 minutes, and slide is 2 minutes. We have written a window
>> function to select a particular window out of multiple windows for a keyed
>> stream, e.g. we select about 16 windows out of multiple windows for the
>> keyed stream based on a particular condition.
>>
>> Upon a normal execution, we get 16 windows for processing inside the
>> condition (in window function mentioned). These windows we are putting in
>> different files, named after window start and end times.
>>
>> the code is as below:
>>
>> Calling code
>>
>>
>> public class RealTimeProcessingSlidingWindow{
>>
>> public static void main(String[] args) throws Exception {
>>
>> // set up the execution environment
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> getExecutionEnvironment();
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> // configure the Kafka consumer
>> Properties kafkaProps = new Properties();
>> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
>> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
>> kafkaProps.setProperty("group.id", DEMO_GROUP);
>> // always read the Kafka topic from the start
>> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>>
>>                 FlinkKafkaConsumer09<Tuple5<String, String, Float,
>> Float, String>> consumer = new FlinkKafkaConsumer09<>(
>> "test",            // kafka topic name
>> new dataSchema(),
>> kafkaProps);
>>                 DataStream<Tuple5<String, String, Float, Float, String>>
>> stream1 = env.addSource(consumer);
>>                 DataStream<Tuple5<String, String, Float, Float, String>>
>> keyedStream = stream1.assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessGenerator2());
>>
>>                 keyedStream.keyBy(4)
>> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))
>> // 6 min window with 2 min sliding window
>> .apply(new CustomSlidingWindowFunction());
>>
>>                 env.execute("Sliding Event Time Window Processing");
>>
>>            }
>> }
>>
>>
>> public static class CustomSlidingWindowFunction implements
>> WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
>> String, Float, Float, String>, Tuple, TimeWindow>{
>>
>> @Override
>> public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
>> String, Float, Float, String>> input,
>> Collector<Tuple5<String, String, Float, Float, String>> out) throws
>> Exception {
>>
>> HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap=
>> new HashMap<String,Tuple5<String, String, Float, Float, String>>();
>> for (Tuple5<String, String, Float, Float, String> wr: input){
>> windowMap.put(wr.f1.toString().trim(), wr);
>> }
>>
>>                         ...
>>
>> SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);
>>
>> if(windowMap.containsKey(tk)){
>> Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String,
>> Float, Float, String>) windowMap.get(tk);
>>
>> Date d = sf.parse(t.f0.trim());
>>
>>                                 ...
>>
>>                                 // Condition for selecting a window
>> if(d.after(x) && d.before(y)){
>> // Write the window output to separate files named after window Lat and
>> Lon
>> writeWindowToFile(t, window, input);
>>                                     }
>>                          }
>>                 }
>>         }
>>
>> // Get the buffered writer
>> private static synchronized BufferedWriter getWriter(String fileName)
>> throws IOException{
>> return new BufferedWriter(new FileWriter(fileName, true));
>> }
>> // Writes an entire window to file for the records in that window
>> private static synchronized void writeWindowToFile(Tuple5<String,
>> String, Float, Float, String> target, TimeWindow window,
>> Iterable<Tuple5<String, String, Float, Float, String>> input) throws
>> IOException{
>> // Create a file to write a window to
>> String fileName = target.f2.toString() + "-" +
>> target.f3.toString()+".csv";
>> BufferedWriter br = getWriter(fileName);
>>
>> // Iterate and put the records in file
>> for (Tuple5<String, String, Float, Float, String> tr: input){
>> br.write(tr.f1.toString().trim()+", "+
>> convertLongIntoDate(window.getStart())+", "+convertLongIntoDate(window.getEnd())+",
>> "+
>> tr.f0+", "+tr.f2+", "+tr.f3+'\n');
>> }
>> // flush the writer and close it
>> br.close();
>> }
>>
>> We have written the code to be threadsafe while creating and writing to
>> file
>>
>> In this code, If we execute the code multiple times on the Kafka Stream
>> (with certain records) most times we get 16 files with corresponding window
>> records, which is the correct behavior.
>>
>> However sometimes only 4 files get created or 1 file or any number less
>> than 16 gets created randomly, this is anomalous behavior.
>>
>> What could be the cause of such behavior? How do we resolve this?
>>
>> Please, could you identify and suggest a solution/s.
>>
>> Thanks.
>>
>>
>> *Sujit Sakre*
>>
>>
>>
>> This email is sent on behalf of Northgate Public Services (UK) Limited
>> and its associated companies including Rave Technologies (India) Pvt
>> Limited (together "Northgate Public Services") and is strictly confidential
>> and intended solely for the addressee(s).
>> If you are not the intended recipient of this email you must: (i) not
>> disclose, copy or distribute its contents to any other person nor use its
>> contents in any way or you may be acting unlawfully;  (ii) contact
>> Northgate Public Services immediately on +44(0)1908 264500
>> <+44%201908%20264500> quoting the name of the sender and the addressee
>> then delete it from your system.
>> Northgate Public Services has taken reasonable precautions to ensure that
>> no viruses are contained in this email, but does not accept any
>> responsibility once this email has been transmitted.  You should scan
>> attachments (if any) for viruses.
>>
>> Northgate Public Services (UK) Limited, registered in England and Wales
>> under number 00968498 with a registered address of Peoplebuilding 2,
>> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
>> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
>> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
>> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>>
>

-- 
This email is sent on behalf of Northgate Public Services (UK) Limited and 
its associated companies including Rave Technologies (India) Pvt Limited 
(together "Northgate Public Services") and is strictly confidential and 
intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not 
disclose, copy or distribute its contents to any other person nor use its 
contents in any way or you may be acting unlawfully;  (ii) contact 
Northgate Public Services immediately on +44(0)1908 264500 quoting the name 
of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that 
no viruses are contained in this email, but does not accept any 
responsibility once this email has been transmitted.  You should scan 
attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales 
under number 00968498 with a registered address of Peoplebuilding 2, 
Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 
4NN.  Rave Technologies (India) Pvt Limited, registered in India under 
number 117068 with a registered address of 2nd Floor, Ballard House, Adi 
Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.

Re: Sliding Event Time Window Processing: Window Function inconsistent behavior

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
instead of writing to files, could you please simply output a value using
the Collector and then write the result stream of the window operation to a
sink (such as a file sink) to see how many windows are being processed.
Having side effects (especially output) in user functions can lead to
programs with quite unexpected behaviour and I would highly discourage
doing that.

Cheers,
Aljoscha

On Tue, 10 Jan 2017 at 13:44 Sujit Sakre <su...@northgateps.com>
wrote:

> Hi,
>
> In the link (
> http://stackoverflow.com/questions/41143518/sliding-processing-time-window-computes-inconsistent-results),
> Fabian has mentioned that if Event Time is used, consistent results are
> possible.
>
> However, that's not the case with us. We are getting very random results.
>
> Please suggest.
>
>
> *Sujit Sakre*
>
>
> On 9 January 2017 at 22:27, Sujit Sakre <su...@northgateps.com>
> wrote:
>
> Hi,
>
> We are using Sliding Event Time Window with Kafka Consumer. The window
> size is 6 minutes, and slide is 2 minutes. We have written a window
> function to select a particular window out of multiple windows for a keyed
> stream, e.g. we select about 16 windows out of multiple windows for the
> keyed stream based on a particular condition.
>
> Upon a normal execution, we get 16 windows for processing inside the
> condition (in window function mentioned). These windows we are putting in
> different files, named after window start and end times.
>
> the code is as below:
>
> Calling code
>
>
> public class RealTimeProcessingSlidingWindow{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
> kafkaProps.setProperty("group.id", DEMO_GROUP);
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
>                 FlinkKafkaConsumer09<Tuple5<String, String, Float, Float,
> String>> consumer = new FlinkKafkaConsumer09<>(
> "test",            // kafka topic name
> new dataSchema(),
> kafkaProps);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> stream1 = env.addSource(consumer);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
>                 keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
>                 env.execute("Sliding Event Time Window Processing");
>
>            }
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
> String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input,
> Collector<Tuple5<String, String, Float, Float, String>> out) throws
> Exception {
>
> HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap=
> new HashMap<String,Tuple5<String, String, Float, Float, String>>();
> for (Tuple5<String, String, Float, Float, String> wr: input){
> windowMap.put(wr.f1.toString().trim(), wr);
> }
>
>                         ...
>
> SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);
>
> if(windowMap.containsKey(tk)){
> Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String,
> Float, Float, String>) windowMap.get(tk);
>
> Date d = sf.parse(t.f0.trim());
>
>                                 ...
>
>                                 // Condition for selecting a window
> if(d.after(x) && d.before(y)){
> // Write the window output to separate files named after window Lat and Lon
> writeWindowToFile(t, window, input);
>                                     }
>                          }
>                 }
>         }
>
> // Get the buffered writer
> private static synchronized BufferedWriter getWriter(String fileName)
> throws IOException{
> return new BufferedWriter(new FileWriter(fileName, true));
> }
> // Writes an entire window to file for the records in that window
> private static synchronized void writeWindowToFile(Tuple5<String, String,
> Float, Float, String> target, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input) throws IOException{
> // Create a file to write a window to
> String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv";
> BufferedWriter br = getWriter(fileName);
>
> // Iterate and put the records in file
> for (Tuple5<String, String, Float, Float, String> tr: input){
> br.write(tr.f1.toString().trim()+", "+
> convertLongIntoDate(window.getStart())+",
> "+convertLongIntoDate(window.getEnd())+", "+
> tr.f0+", "+tr.f2+", "+tr.f3+'\n');
> }
> // flush the writer and close it
> br.close();
> }
>
> We have written the code to be threadsafe while creating and writing to
> file
>
> In this code, If we execute the code multiple times on the Kafka Stream
> (with certain records) most times we get 16 files with corresponding window
> records, which is the correct behavior.
>
> However sometimes only 4 files get created or 1 file or any number less
> than 16 gets created randomly, this is anomalous behavior.
>
> What could be the cause of such behavior? How do we resolve this?
>
> Please, could you identify and suggest a solution/s.
>
> Thanks.
>
>
> *Sujit Sakre*
>
>
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>

Re: Sliding Event Time Window Processing: Window Function inconsistent behavior

Posted by Sujit Sakre <su...@northgateps.com>.
Hi,

In the link (
http://stackoverflow.com/questions/41143518/sliding-processing-time-window-computes-inconsistent-results),
Fabian has mentioned that if Event Time is used, consistent results are
possible.

However, that's not the case with us. We are getting very random results.

Please suggest.


*Sujit Sakre*


On 9 January 2017 at 22:27, Sujit Sakre <su...@northgateps.com> wrote:

> Hi,
>
> We are using Sliding Event Time Window with Kafka Consumer. The window
> size is 6 minutes, and slide is 2 minutes. We have written a window
> function to select a particular window out of multiple windows for a keyed
> stream, e.g. we select about 16 windows out of multiple windows for the
> keyed stream based on a particular condition.
>
> Upon a normal execution, we get 16 windows for processing inside the
> condition (in window function mentioned). These windows we are putting in
> different files, named after window start and end times.
>
> the code is as below:
>
> Calling code
>
>
> public class RealTimeProcessingSlidingWindow{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
> ExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
> kafkaProps.setProperty("group.id", DEMO_GROUP);
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
>                 FlinkKafkaConsumer09<Tuple5<String, String, Float, Float,
> String>> consumer = new FlinkKafkaConsumer09<>(
> "test",            // kafka topic name
> new dataSchema(),
> kafkaProps);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> stream1 = env.addSource(consumer);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
>                 keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
>                 env.execute("Sliding Event Time Window Processing");
>
>            }
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
> String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input,
> Collector<Tuple5<String, String, Float, Float, String>> out) throws
> Exception {
>
> HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap=
> new HashMap<String,Tuple5<String, String, Float, Float, String>>();
> for (Tuple5<String, String, Float, Float, String> wr: input){
> windowMap.put(wr.f1.toString().trim(), wr);
> }
>
>                         ...
>
> SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);
>
> if(windowMap.containsKey(tk)){
> Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String,
> Float, Float, String>) windowMap.get(tk);
>
> Date d = sf.parse(t.f0.trim());
>
>                                 ...
>
>                                 // Condition for selecting a window
> if(d.after(x) && d.before(y)){
> // Write the window output to separate files named after window Lat and Lon
> writeWindowToFile(t, window, input);
>                                     }
>                          }
>                 }
>         }
>
> // Get the buffered writer
> private static synchronized BufferedWriter getWriter(String fileName)
> throws IOException{
> return new BufferedWriter(new FileWriter(fileName, true));
> }
> // Writes an entire window to file for the records in that window
> private static synchronized void writeWindowToFile(Tuple5<String, String,
> Float, Float, String> target, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input) throws IOException{
> // Create a file to write a window to
> String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv";
> BufferedWriter br = getWriter(fileName);
>
> // Iterate and put the records in file
> for (Tuple5<String, String, Float, Float, String> tr: input){
> br.write(tr.f1.toString().trim()+", "+
> convertLongIntoDate(window.getStart())+", "+convertLongIntoDate(window.getEnd())+",
> "+
> tr.f0+", "+tr.f2+", "+tr.f3+'\n');
> }
> // flush the writer and close it
> br.close();
> }
>
> We have written the code to be threadsafe while creating and writing to
> file
>
> In this code, If we execute the code multiple times on the Kafka Stream
> (with certain records) most times we get 16 files with corresponding window
> records, which is the correct behavior.
>
> However sometimes only 4 files get created or 1 file or any number less
> than 16 gets created randomly, this is anomalous behavior.
>
> What could be the cause of such behavior? How do we resolve this?
>
> Please, could you identify and suggest a solution/s.
>
> Thanks.
>
>
> *Sujit Sakre*
>

-- 
This email is sent on behalf of Northgate Public Services (UK) Limited and 
its associated companies including Rave Technologies (India) Pvt Limited 
(together "Northgate Public Services") and is strictly confidential and 
intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not 
disclose, copy or distribute its contents to any other person nor use its 
contents in any way or you may be acting unlawfully;  (ii) contact 
Northgate Public Services immediately on +44(0)1908 264500 quoting the name 
of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that 
no viruses are contained in this email, but does not accept any 
responsibility once this email has been transmitted.  You should scan 
attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales 
under number 00968498 with a registered address of Peoplebuilding 2, 
Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 
4NN.  Rave Technologies (India) Pvt Limited, registered in India under 
number 117068 with a registered address of 2nd Floor, Ballard House, Adi 
Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.