You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/10/14 23:00:55 UTC

Problems with window function

Hi all,
I'm trying to implement a time ordering inside a stream using window
function. Then my purposes is to order the element inside a tumbling window.

This is my code (written following the doc):

DataStream<Harness.KafkaRecord> LCxAccStream = env
				.addSource(new FlinkKafkaConsumer010<>("LCacc", new
CustomDeserializer(), properties)).setParallelism(4)
				.assignTimestampsAndWatermarks(new
CustomTimestampExtractor()).setParallelism(4)
				.map(new MapFunction<Tuple8&lt;String, String, Date, String, String,
Double, Double, Double>, Harness.KafkaRecord>(){ 

					@Override 
					public Harness.KafkaRecord map(Tuple8<String, String, Date, String,
String, Double, Double, Double> value) throws Exception { 
						return new Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3,
value.f4, value.f5); 
					} 
				}).setParallelism(4)
				.keyBy("key")
				.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
				.apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {
					
					public void apply(String key, 
							TimeWindow window, 
							Iterable<Harness.KafkaRecord> input, 
							Collector<Harness.KafkaRecord> out)
							throws Exception {
						
						 ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();
						 
						 for (Harness.KafkaRecord in: input) 
						 	 list.add(in);
						 Collections.sort(list);	
						 for(Harness.KafkaRecord output: list)
							 out.collect(output);
					}
				});

Clearly I have defined a comparator for Harness.KafkaRecord object.
Unfortunately the method .apply(...) shows the following error:

/The method apply(WindowFunction<Harness.KafkaRecord,R,Tuple,TimeWindow>) in
the type WindowedStream<Harness.KafkaRecord,Tuple,TimeWindow> is not
applicable for the arguments (new
WindowFunction<Harness.KafkaRecord,Harness.KafkaRecord,String,TimeWindow>(){})
/

Honestly I don't understand why I can't use String instead of Tuple. Btw my
key type is a String and moreover I can't understand what could means the
type Tuple in this case.

Furthermore I noted that in the example here:  WindowFunction - The Generic
Case
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#keyed-vs-non-keyed-windows>  
it use a String type as key of the KeyedStream.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Problems with window function

Posted by AndreaKinn <ki...@hotmail.it>.
KeySelector was exactly what I need. Thank you a lot.
I modified my code in this way and now it works:

DataStream<Harness.KafkaRecord> LCxAccStream = env
				.addSource(new FlinkKafkaConsumer010<>("LCacc", new
CustomDeserializer(), properties)).setParallelism(4)
				.assignTimestampsAndWatermarks(new
CustomTimestampExtractor()).setParallelism(4)
				.map(new MapFunction<Tuple8&lt;String, String, Date, String, String,
Double, Double, Double>, Harness.KafkaRecord>(){ 

					@Override 
					public Harness.KafkaRecord map(Tuple8<String, String, Date, String,
String, Double, Double, Double> value) throws Exception { 
						return new Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3,
value.f4, value.f5); 
					} 
				}).setParallelism(4)
				.keyBy(new KeySelector<Harness.KafkaRecord, String>() {
				     public String getKey(Harness.KafkaRecord record) { return
record.key; }
				   })
				.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
				.apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {
					
					public void apply(String key, 
							TimeWindow window, 
							Iterable<Harness.KafkaRecord> input, 
							Collector<Harness.KafkaRecord> out)
							throws Exception {
						
						 ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();
						 
						 for (Harness.KafkaRecord in: input) 
						 	 list.add(in);
						 Collections.sort(list);	
						 for(Harness.KafkaRecord output: list)
							 out.collect(output);
					}
				});



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Problems with window function

Posted by Tony Wei <to...@gmail.com>.
Hi Andrea,

AFAIK, `keyBy` function you used will wrap all keys you selected into
`Tuple`. You can use `Tuple.f0` to get your key, whose type will be
`String`.
If you want the KeyedStream has String Type for its key, you can use
`KeySelector` in keyBy function. [1]
Hope this will help you.

Best Regards,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector-

2017-10-15 7:00 GMT+08:00 AndreaKinn <ki...@hotmail.it>:

> Hi all,
> I'm trying to implement a time ordering inside a stream using window
> function. Then my purposes is to order the element inside a tumbling
> window.
>
> This is my code (written following the doc):
>
> DataStream<Harness.KafkaRecord> LCxAccStream = env
>                                 .addSource(new FlinkKafkaConsumer010<>("LCacc",
> new
> CustomDeserializer(), properties)).setParallelism(4)
>                                 .assignTimestampsAndWatermarks(new
> CustomTimestampExtractor()).setParallelism(4)
>                                 .map(new MapFunction<Tuple8&lt;String,
> String, Date, String, String,
> Double, Double, Double>, Harness.KafkaRecord>(){
>
>                                         @Override
>                                         public Harness.KafkaRecord
> map(Tuple8<String, String, Date, String,
> String, Double, Double, Double> value) throws Exception {
>                                                 return new
> Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3,
> value.f4, value.f5);
>                                         }
>                                 }).setParallelism(4)
>                                 .keyBy("key")
>                                 .window(TumblingEventTimeWindows.of(
> Time.milliseconds(WINDOW_SIZE)))
>                                 .apply(new WindowFunction<Harness.KafkaRecord,
> Harness.KafkaRecord,
> String, TimeWindow>() {
>
>                                         public void apply(String key,
>                                                         TimeWindow window,
>
> Iterable<Harness.KafkaRecord> input,
>
> Collector<Harness.KafkaRecord> out)
>                                                         throws Exception {
>
>
>  ArrayList<Harness.KafkaRecord> list = new
> ArrayList<Harness.KafkaRecord>();
>
>                                                  for (Harness.KafkaRecord
> in: input)
>                                                          list.add(in);
>                                                  Collections.sort(list);
>                                                  for(Harness.KafkaRecord
> output: list)
>
>  out.collect(output);
>                                         }
>                                 });
>
> Clearly I have defined a comparator for Harness.KafkaRecord object.
> Unfortunately the method .apply(...) shows the following error:
>
> /The method apply(WindowFunction<Harness.KafkaRecord,R,Tuple,TimeWindow>)
> in
> the type WindowedStream<Harness.KafkaRecord,Tuple,TimeWindow> is not
> applicable for the arguments (new
> WindowFunction<Harness.KafkaRecord,Harness.KafkaRecord,String,TimeWindow>
> (){})
> /
>
> Honestly I don't understand why I can't use String instead of Tuple. Btw my
> key type is a String and moreover I can't understand what could means the
> type Tuple in this case.
>
> Furthermore I noted that in the example here:  WindowFunction - The Generic
> Case
> <https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/windows.html#keyed-vs-non-keyed-windows>
> it use a String type as key of the KeyedStream.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>