You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 马庆祥 <xy...@gmail.com> on 2017/09/06 13:41:16 UTC

Fwd: some question about side output

Hi, all:

Using Flink’s side output
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html>
feature
we can get a stream of the data that was discarded as late.But when i use
the getSideOutput() method, i have the following error message:

this is the link address of document:
https://ci.apache.org/projects/flink/flink-docs-
release-1.3/dev/windows.html#getting-late-data-as-a-side-output

[image: 内嵌图片 2]

Note: the code is as follows:
[image: 内嵌图片 1]

Thanks & Regards
Qingxiang Ma

Re: Fwd: some question about side output

Posted by Biplob Biswas <re...@gmail.com>.
Change the type of the mainstream from DataStream to
SingleOutputStreamOperator

The getSideOutput() function is not part of the base class DataStream rather
the extended Class SingleOutputStreamOperator



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: some question about side output

Posted by Chen Qin <qi...@gmail.com>.
Hi Qingxiang,

getSideOuput is only available in SingleOutputOperator class. You might
consider update your DataStream<...> window to SingleOutputOperator and it
should works fine.

Thanks,
Chen

Code sample attached handy.

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import javax.annotation.Nullable;

public class SideOutputExample {

    public static void main(String argv[]) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        final OutputTag<Tuple2<String, Long>> lateTag = new
OutputTag<Tuple2<String, Long>>("tag"){};

        SingleOutputStreamOperator output = env.addSource(new
SourceFunction<Tuple2<String, Long>>() {
            public void run(SourceContext<Tuple2<String, Long>>
sourceContext) throws Exception {
                // emit three events
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(new Tuple2<String, Long>("a", 0l));
                    sourceContext.collect(new Tuple2<String, Long>("c", 2l));
                    sourceContext.collect(new Tuple2<String, Long>("b", 1l));
                }
            }

            public void cancel() {
                //SKIP
            }
        }).assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() {
            @Nullable
            public Watermark checkAndGetNextWatermark(Tuple2<String,
Long> stringLongTuple2, long l) {
                return new Watermark(stringLongTuple2.f1);
            }

            public long extractTimestamp(Tuple2<String, Long>
stringLongTuple2, long l) {
                return stringLongTuple2.f1;
            }
        }).timeWindowAll(Time.milliseconds(1)).sideOutputLateData(lateTag)

                .apply(new AllWindowFunction<Tuple2<String,Long>,
String, TimeWindow>() {
            public void apply(TimeWindow timeWindow,
Iterable<Tuple2<String, Long>> iterable, Collector<String> collector)
throws Exception {
                for(Tuple2<String, Long> it : iterable) {
                    collector.collect(it.f0);
                }
            }
        });

        //print on time event
        output.print();

        // print late arriving event
        output.getSideOutput(lateTag).print();

        env.execute();
    }
}


On Wed, Sep 6, 2017 at 6:41 AM, 马庆祥 <xy...@gmail.com> wrote:

> Hi, all:
>
> Using Flink’s side output
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html> feature
> we can get a stream of the data that was discarded as late.But when i use
> the getSideOutput() method, i have the following error message:
>
> this is the link address of document:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/windows.html#getting-late-data-as-a-side-output
>
> [image: 内嵌图片 2]
>
> Note: the code is as follows:
> [image: 内嵌图片 1]
>
> Thanks & Regards
> Qingxiang Ma
>
>