You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HungChang <un...@gmail.com> on 2016/02/25 11:16:56 UTC

The way to itearte instances in AllWindowFunction in current Master branch

Hi,

I would like to iterate all the instances in windows (count the events in
the windows and show the time range of windows).

in 0.10.2 there is AllWindowFunction that can be used to iterate tuples.
public interface AllWindowFunction<IN, OUT, W extends Window> extends
Function, Serializable {
    void apply(W var1, Iterable<IN> var2, Collector<OUT> var3) throws
Exception;
}

In the current master branch AllWindowFunction now it's not able to iterate. 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java

Can I ask what would be current the way to iterate the instances in windows?
I saw there are ReduceAllWindowFunction and ReduceIterableAllWindowFunction
but they are @internal.

Best,

Hung



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Hung,
after some discussion the way that window functions are used will change back to the way it was in 0.10.x, i.e. the Iterable is always part of the apply function.

Sorry for the inconvenience this has caused.

Cheers,
Aljoscha
> On 26 Feb 2016, at 11:48, Aljoscha Krettek <al...@apache.org> wrote:
> 
> Hi,
> yes that seems to have been the issue. The Math.max() is used to ensure that the timestamp does never decrease, because this is not allowed for a watermark.
> 
> Cheers,
> Aljoscha
>> On 26 Feb 2016, at 11:11, HungChang <un...@gmail.com> wrote:
>> 
>> Ah! My incorrect code segment made the Watermark not going forward and always
>> stay at the same moment in the past. Is that true and the issue?
>> 
>> Cheers,
>> 
>> Hung
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5186.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
> 


Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
yes that seems to have been the issue. The Math.max() is used to ensure that the timestamp does never decrease, because this is not allowed for a watermark.

Cheers,
Aljoscha
> On 26 Feb 2016, at 11:11, HungChang <un...@gmail.com> wrote:
> 
> Ah! My incorrect code segment made the Watermark not going forward and always
> stay at the same moment in the past. Is that true and the issue?
> 
> Cheers,
> 
> Hung
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5186.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by HungChang <un...@gmail.com>.
Ah! My incorrect code segment made the Watermark not going forward and always
stay at the same moment in the past. Is that true and the issue?

Cheers,

Hung



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5186.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by HungChang <un...@gmail.com>.
Many thanks Aljoscha! It can replay computing old instances now. The result
looks absolutely correct.

When printint currentTimestamp there are values such as 1456480762777,
1456480762778...which are not -1s.
So I'm a bit confused about extractTimestamp(). 
Can I ask why
curTimeStamp = currentTimestamp and curTimeStamp = Math.max(curTimeStamp,
biz.time.getMillis()) 
would make such difference when replaying old instances (event_time <<
Time.now())?  From my understanding this curTimeStamp affects
getCurrentWatermark() because it will make getCurrentWatermark() return much
smaller value.

Cheers,

Hung



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5185.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Hung,
I see one thing that could explain the problem, the timestamp assigner should look like this:

new AssignerWithPeriodicWatermarks<BizEvent>() {

                   long curTimeStamp;

                   @Override
                   public long extractTimestamp(BizEvent biz, long
currentTimestamp) {
                       curTimeStamp = Math.max(curTimeStamp, biz.time.getMillis());
                       return biz.time.getMillis();
                   }

                   @Override
                   public long getCurrentWatermark() {
                       return (curTimeStamp - (maxEventDelay * 1000));
                   }
               }

The currentTimestamp parameter is the internal timestamp that the element had before, which is most likely just “-1” because no timestamp was previously assigned.

Does it work with that fix?

Cheers,
Aljoscha

> On 25 Feb 2016, at 17:26, HungChang <un...@gmail.com> wrote:
> 
> An update. The following situation works as expected. The data arrives after
> Flink job starts to execute.
> 1> (2016-02-25T17:46:25.00,13)
> 2> (2016-02-25T17:46:40.00,16)
> 3> (2016-02-25T17:46:50.00,11)
> 4> (2016-02-25T17:47:10.00,12)
> 
> But for the data arrives long time before. Strange behavior appears. Does it
> mean we cannot reply the computation?
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5156.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by HungChang <un...@gmail.com>.
An update. The following situation works as expected. The data arrives after
Flink job starts to execute.
1> (2016-02-25T17:46:25.00,13)
2> (2016-02-25T17:46:40.00,16)
3> (2016-02-25T17:46:50.00,11)
4> (2016-02-25T17:47:10.00,12)

But for the data arrives long time before. Strange behavior appears. Does it
mean we cannot reply the computation?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5156.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by HungChang <un...@gmail.com>.
Thank you for your reply. Please let me know if other classes o full code is
needed. 

/**
 * Count how many total events
*/

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(4, env_config);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.setProperty("group.id", "test");
        properties.setProperty("client.id", "flink_test");
        properties.setProperty("auto.offset.reset", "earliest");

        final int maxEventDelay = 5; // events are out of order by max x
seconds
        DataStream<BizEvent> bizs = env.addSource(new
FlinkKafkaConsumer09<>(KAFKA_TOPIC,
                new BizSchema(), properties)).
                assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<BizEvent>() {

                    long curTimeStamp;

                    @Override
                    public long extractTimestamp(BizEvent biz, long
currentTimestamp) {
                        curTimeStamp = currentTimestamp;
                        return biz.time.getMillis();
                    }

                    @Override
                    public long getCurrentWatermark() {
                        return (curTimeStamp - (maxEventDelay * 1000));
                    }
                });

        DataStream<Tuple2&lt;BizEvent, Integer>> bizCnt = bizs.flatMap(new
CountBiz());

        DataStream<Tuple2&lt;String, Integer>> bizWindowTotal =
bizCnt.timeWindowAll(Time.of(5, TimeUnit.MINUTES))
              .apply(new SumStartTsAllWindow());

   // Output(start time of windows, counts)
    public static class SumStartTsAllWindow implements
AllWindowFunction<Iterable&lt;Tuple2&lt;BizEvent, Integer>>,
            Tuple2<String, Integer>, TimeWindow> {

        private static DateTimeFormatter timeFormatter =
               
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").withLocale(Locale.GERMAN).
                        withZone(DateTimeZone.forID("Europe/Berlin"));
        @Override
        public void apply(TimeWindow timeWindow,
Iterable<Tuple2&lt;BizEvent, Integer>> values,
                          Collector<Tuple2&lt;String, Integer>> collector)
throws Exception {

            DateTime startTs = new DateTime(timeWindow.getStart(),
DateTimeZone.forID("Europe/Berlin"));


            Iterator<Tuple2&lt;BizEvent, Integer>> it = values.iterator();
            int sum=0;
            while(it.hasNext()){
                Tuple2<BizEvent, Integer> value = it.next();
                sum += value.f1;
            }
            collector.collect(new Tuple2<>(startTs.toString(timeFormatter),
sum));
        }
    }

    // Output (BizEvent, 1)
    public static class CountBiz implements FlatMapFunction<BizEvent,
Tuple2&lt;BizEvent, Integer>> {

        @Override
        public void flatMap(BizEvent bizEvent, Collector<Tuple2&lt;BizEvent,
Integer>> collector) {
            //System.out.println("TIme in count!: " + bizEvent.time);
            collector.collect(new Tuple2<>(bizEvent, (int) 1));
        }
    }



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5151.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Hung,
could you maybe post a more complete snippet of your program? This would allow me to figure out why the output changes between versions 0.10 and 1.0.

@Matthias: The signature was changed to also allow window functions that don’t take an Iterable. For example, when doing WindowedStream.apply(ReduceFunction, WindowFunction) the window function only gets a single element. Before, this would be a single element inside an Iterable. Now the fact that it gets a single element is reflected in the signature.

> On 25 Feb 2016, at 14:47, Matthias J. Sax <mj...@apache.org> wrote:
> 
> Just out of curiosity: Why was it changes like this. Specifying
> "Iterable<...>" as type in AllWindowFunction seems rather unintuitive...
> 
> -Matthias
> 
> On 02/25/2016 01:58 PM, Aljoscha Krettek wrote:
>> Hi,
>> yes that is true. The way you would now write such a function is this:
>> 
>> private static class MyIterableFunction implements AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> {
>>   private static final long serialVersionUID = 1L;
>> 
>>   @Override
>>   public void apply(
>>         TimeWindow window,
>>         Iterable<Tuple2<String, Integer>> values,
>>         Collector<Tuple2<String, Integer>> out) throws Exception {
>> 
>>   }
>> }
>> 
>> (I used Tuple2<String, Integer> as an example input type here.)
>> 
>> and then you can use it with AllWindowedStream.apply(new MyIterableFunction());
>> 
>> 
>>> On 25 Feb 2016, at 13:29, HungChang <un...@gmail.com> wrote:
>>> 
>>> Thank you for your reply.
>>> 
>>> The following in the current master looks like not iterable? because the
>>> parameter is IN rather than Iterable<IN>
>>> So I still have problem to iterate,,,
>>> 
>>> @Public
>>> public interface AllWindowFunction<IN, OUT,  W extends Window> extends
>>> Function, Serializable {
>>> 
>>> 	/**
>>> 	 * Evaluates the window and outputs none or several elements.
>>> 	 *
>>> 	 * @param window The window that is being evaluated.
>>> 	 * @param values The elements in the window being evaluated.
>>> 	 * @param out A collector for emitting elements.
>>> 	 *
>>> 	 * @throws Exception The function may throw exceptions to fail the program
>>> and trigger recovery.
>>> 	 */
>>> 	void apply(W window, IN values, Collector<OUT> out) throws Exception;
>>> }
>>> 
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
>>> 
>>> Best,
>>> 
>>> Hung
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5145.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>> 
> 


Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by "Matthias J. Sax" <mj...@apache.org>.
Just out of curiosity: Why was it changes like this. Specifying
"Iterable<...>" as type in AllWindowFunction seems rather unintuitive...

-Matthias

On 02/25/2016 01:58 PM, Aljoscha Krettek wrote:
> Hi,
> yes that is true. The way you would now write such a function is this:
> 
> private static class MyIterableFunction implements AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> {
>    private static final long serialVersionUID = 1L;
> 
>    @Override
>    public void apply(
>          TimeWindow window,
>          Iterable<Tuple2<String, Integer>> values,
>          Collector<Tuple2<String, Integer>> out) throws Exception {
> 
>    }
> }
> 
> (I used Tuple2<String, Integer> as an example input type here.)
> 
> and then you can use it with AllWindowedStream.apply(new MyIterableFunction());
> 
> 
>> On 25 Feb 2016, at 13:29, HungChang <un...@gmail.com> wrote:
>>
>> Thank you for your reply.
>>
>> The following in the current master looks like not iterable? because the
>> parameter is IN rather than Iterable<IN>
>> So I still have problem to iterate,,,
>>
>> @Public
>> public interface AllWindowFunction<IN, OUT,  W extends Window> extends
>> Function, Serializable {
>>
>> 	/**
>> 	 * Evaluates the window and outputs none or several elements.
>> 	 *
>> 	 * @param window The window that is being evaluated.
>> 	 * @param values The elements in the window being evaluated.
>> 	 * @param out A collector for emitting elements.
>> 	 *
>> 	 * @throws Exception The function may throw exceptions to fail the program
>> and trigger recovery.
>> 	 */
>> 	void apply(W window, IN values, Collector<OUT> out) throws Exception;
>> }
>>
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
>>
>> Best,
>>
>> Hung
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5145.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
> 


Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by HungChang <un...@gmail.com>.
Thanks you. I can be sure this way is correct now. 
I have tried this but the windows are not aggregating as well. Instead, the
AllWindowFunction only works as flatMap. 
Shouldn't it only output for one window range? The most strange part is the
first output is aggregating while others are not.

1> (68,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)
1> (1,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)
1> (1,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)
1> (1,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)
....

When running 0.10.2 version the output is correct that the window time is
not overlap (I'm using tumbling window)
1> (8,2016-02-18T12:00:00.00,2016-02-18T12:10:00.00)
1> (5,2016-02-18T12:10:00.00,2016-02-18T12:20:00.00)
1> (6,2016-02-18T12:20:00.00,2016-02-18T12:30:00.00)
1> (3,2016-02-18T12:30:00.00,2016-02-18T12:40:00.00)
....

Perhaps I should look into other issues.

Best,

Hung



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5148.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
yes that is true. The way you would now write such a function is this:

private static class MyIterableFunction implements AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> {
   private static final long serialVersionUID = 1L;

   @Override
   public void apply(
         TimeWindow window,
         Iterable<Tuple2<String, Integer>> values,
         Collector<Tuple2<String, Integer>> out) throws Exception {

   }
}

(I used Tuple2<String, Integer> as an example input type here.)

and then you can use it with AllWindowedStream.apply(new MyIterableFunction());


> On 25 Feb 2016, at 13:29, HungChang <un...@gmail.com> wrote:
> 
> Thank you for your reply.
> 
> The following in the current master looks like not iterable? because the
> parameter is IN rather than Iterable<IN>
> So I still have problem to iterate,,,
> 
> @Public
> public interface AllWindowFunction<IN, OUT,  W extends Window> extends
> Function, Serializable {
> 
> 	/**
> 	 * Evaluates the window and outputs none or several elements.
> 	 *
> 	 * @param window The window that is being evaluated.
> 	 * @param values The elements in the window being evaluated.
> 	 * @param out A collector for emitting elements.
> 	 *
> 	 * @throws Exception The function may throw exceptions to fail the program
> and trigger recovery.
> 	 */
> 	void apply(W window, IN values, Collector<OUT> out) throws Exception;
> }
> 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
> 
> Best,
> 
> Hung
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5145.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by HungChang <un...@gmail.com>.
Thank you for your reply.

The following in the current master looks like not iterable? because the
parameter is IN rather than Iterable<IN>
So I still have problem to iterate,,,

@Public
public interface AllWindowFunction<IN, OUT,  W extends Window> extends
Function, Serializable {

	/**
	 * Evaluates the window and outputs none or several elements.
	 *
	 * @param window The window that is being evaluated.
	 * @param values The elements in the window being evaluated.
	 * @param out A collector for emitting elements.
	 *
	 * @throws Exception The function may throw exceptions to fail the program
and trigger recovery.
	 */
	void apply(W window, IN values, Collector<OUT> out) throws Exception;
}

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java

Best,

Hung



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5145.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: The way to itearte instances in AllWindowFunction in current Master branch

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Hung,
you are right, the generic parameters of AllWindowFunction changed from Iterable<IN> to IN. However, in the apply function on AllWindowedStream the parameter changed from IN to Iterable<IN>.

What this means is that you can still do:

windowed.apply(new MyIterableWindowFunction())

and iterate over the elements in the window.

I hope that helps but please let me know if I should go into more details.

Cheers,
Aljoscha
> On 25 Feb 2016, at 11:16, HungChang <un...@gmail.com> wrote:
> 
> Hi,
> 
> I would like to iterate all the instances in windows (count the events in
> the windows and show the time range of windows).
> 
> in 0.10.2 there is AllWindowFunction that can be used to iterate tuples.
> public interface AllWindowFunction<IN, OUT, W extends Window> extends
> Function, Serializable {
>    void apply(W var1, Iterable<IN> var2, Collector<OUT> var3) throws
> Exception;
> }
> 
> In the current master branch AllWindowFunction now it's not able to iterate. 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
> 
> Can I ask what would be current the way to iterate the instances in windows?
> I saw there are ReduceAllWindowFunction and ReduceIterableAllWindowFunction
> but they are @internal.
> 
> Best,
> 
> Hung
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.