You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 马庆祥 <xy...@gmail.com> on 2017/09/06 13:41:16 UTC
Fwd: some question about side output
Hi, all:
Using Flink’s side output
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html>
feature
we can get a stream of the data that was discarded as late.But when i use
the getSideOutput() method, i have the following error message:
this is the link address of document:
https://ci.apache.org/projects/flink/flink-docs-
release-1.3/dev/windows.html#getting-late-data-as-a-side-output
[image: 内嵌图片 2]
Note: the code is as follows:
[image: 内嵌图片 1]
Thanks & Regards
Qingxiang Ma
Re: Fwd: some question about side output
Posted by Biplob Biswas <re...@gmail.com>.
Change the type of the mainstream from DataStream to
SingleOutputStreamOperator
The getSideOutput() function is not part of the base class DataStream rather
the extended Class SingleOutputStreamOperator
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: some question about side output
Posted by Chen Qin <qi...@gmail.com>.
Hi Qingxiang,
getSideOuput is only available in SingleOutputOperator class. You might
consider update your DataStream<...> window to SingleOutputOperator and it
should works fine.
Thanks,
Chen
Code sample attached handy.
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.annotation.Nullable;
public class SideOutputExample {
public static void main(String argv[]) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final OutputTag<Tuple2<String, Long>> lateTag = new
OutputTag<Tuple2<String, Long>>("tag"){};
SingleOutputStreamOperator output = env.addSource(new
SourceFunction<Tuple2<String, Long>>() {
public void run(SourceContext<Tuple2<String, Long>>
sourceContext) throws Exception {
// emit three events
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collect(new Tuple2<String, Long>("a", 0l));
sourceContext.collect(new Tuple2<String, Long>("c", 2l));
sourceContext.collect(new Tuple2<String, Long>("b", 1l));
}
}
public void cancel() {
//SKIP
}
}).assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() {
@Nullable
public Watermark checkAndGetNextWatermark(Tuple2<String,
Long> stringLongTuple2, long l) {
return new Watermark(stringLongTuple2.f1);
}
public long extractTimestamp(Tuple2<String, Long>
stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}).timeWindowAll(Time.milliseconds(1)).sideOutputLateData(lateTag)
.apply(new AllWindowFunction<Tuple2<String,Long>,
String, TimeWindow>() {
public void apply(TimeWindow timeWindow,
Iterable<Tuple2<String, Long>> iterable, Collector<String> collector)
throws Exception {
for(Tuple2<String, Long> it : iterable) {
collector.collect(it.f0);
}
}
});
//print on time event
output.print();
// print late arriving event
output.getSideOutput(lateTag).print();
env.execute();
}
}
On Wed, Sep 6, 2017 at 6:41 AM, 马庆祥 <xy...@gmail.com> wrote:
> Hi, all:
>
> Using Flink’s side output
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html> feature
> we can get a stream of the data that was discarded as late.But when i use
> the getSideOutput() method, i have the following error message:
>
> this is the link address of document:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/windows.html#getting-late-data-as-a-side-output
>
> [image: 内嵌图片 2]
>
> Note: the code is as follows:
> [image: 内嵌图片 1]
>
> Thanks & Regards
> Qingxiang Ma
>
>