You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Navneeth Krishnan <re...@gmail.com> on 2018/04/03 17:55:12 UTC
SideOutput Issue
Hi All,
I'm having issues with creating side outputs. There are two input sources
(both from kafka) and they are connected and fed into a co-process
function. Inside the co-process, the regular data stream outputs a POJO and
in processElement2 there is a periodic timer which creates the side output.
When I start the job I get the below exception. Is there something that I'm
doing wrong?
I used the below example to implement the side output.
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
processElement2
ctx.output("side-output", POJO);
Job
dataStream.getSideOutput("side-output").print();
2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO
org.apache.flink.runtime.taskmanager.Task - Co-Flat Map (4/8)
(20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING.
2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO
org.apache.flink.runtime.taskmanager.Task - Co-Process (1/8)
(fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at
org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at
org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO
org.apache.flink.runtime.taskmanager.Task - Co-Process (7/8)
(a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at
org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at
org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Thanks
Re: SideOutput Issue
Posted by Chesnay Schepler <ch...@apache.org>.
We were able to reproduce the issue.
It was caused by calling getSideOutput() and split() on a single
DataStream, which isn't properly handled by Flink.
As a work-around one can add a no-op map function before the split() call.
I've filed FLINK-9141 <https://issues.apache.org/jira/browse/FLINK-9141>.
On 04.04.2018 12:21, Chesnay Schepler wrote:
> Hi,
>
> which version of Flink are you using?
>
> Could you provide us with a reproducing example? I tried reproducing
> it based on the information you provided in the following code, but it
> runs fine for me:
>
> private static final OutputTag<String> tag = new
> OutputTag<String>("test"){};
>
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> DataStream<String> text1 = env.fromElements("foo");
> DataStream<String> text2 = env.fromElements("bar");
>
> SingleOutputStreamOperator<String> process = text1.connect(text2)
> .process(new CoProcessFunction<String, String, String>() {
> @Override
> public void processElement1(String value, Context ctx,
> Collector<String> out) throws Exception {
> }
>
> @Override
> public void processElement2(String value, Context ctx,
> Collector<String> out) throws Exception {
> }
> });
>
> process.getSideOutput(tag).print();
>
> // execute program
> env.execute("Streaming WordCount");
> }
> On 03.04.2018 19:55, Navneeth Krishnan wrote:
>> Hi All,
>>
>> I'm having issues with creating side outputs. There are two input
>> sources (both from kafka) and they are connected and fed into a
>> co-process function. Inside the co-process, the regular data stream
>> outputs a POJO and in processElement2 there is a periodic timer which
>> creates the side output. When I start the job I get the below
>> exception. Is there something that I'm doing wrong?
>>
>> I used the below example to implement the side output.
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
>>
>> processElement2
>> ctx.output("side-output", POJO);
>>
>> Job
>> dataStream.getSideOutput("side-output").print();
>>
>>
>> 2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Co-Flat Map (4/8)
>> (20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING.
>> 2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Co-Process (1/8)
>> (fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED.
>> java.lang.NullPointerException: null
>> at
>> org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
>> at
>> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Co-Process (7/8)
>> (a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED.
>> java.lang.NullPointerException: null
>> at
>> org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
>> at
>> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Thanks
>
>
Re: SideOutput Issue
Posted by Chesnay Schepler <ch...@apache.org>.
Hi,
which version of Flink are you using?
Could you provide us with a reproducing example? I tried reproducing it
based on the information you provided in the following code, but it runs
fine for me:
private static final OutputTag<String> tag = new
OutputTag<String>("test"){};
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text1 = env.fromElements("foo");
DataStream<String> text2 = env.fromElements("bar");
SingleOutputStreamOperator<String> process = text1.connect(text2)
.process(new CoProcessFunction<String, String, String>() {
@Override
public void processElement1(String value, Context ctx, Collector<String>
out) throws Exception {
}
@Override
public void processElement2(String value, Context ctx, Collector<String>
out) throws Exception {
}
});
process.getSideOutput(tag).print();
// execute program
env.execute("Streaming WordCount");
}
On 03.04.2018 19:55, Navneeth Krishnan wrote:
> Hi All,
>
> I'm having issues with creating side outputs. There are two input
> sources (both from kafka) and they are connected and fed into a
> co-process function. Inside the co-process, the regular data stream
> outputs a POJO and in processElement2 there is a periodic timer which
> creates the side output. When I start the job I get the below
> exception. Is there something that I'm doing wrong?
>
> I used the below example to implement the side output.
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
>
> processElement2
> ctx.output("side-output", POJO);
>
> Job
> dataStream.getSideOutput("side-output").print();
>
>
> 2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO
> org.apache.flink.runtime.taskmanager.Task - Co-Flat Map (4/8)
> (20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING.
> 2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO
> org.apache.flink.runtime.taskmanager.Task - Co-Process (1/8)
> (fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED.
> java.lang.NullPointerException: null
> at
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
> at
> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> 2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO
> org.apache.flink.runtime.taskmanager.Task - Co-Process (7/8)
> (a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED.
> java.lang.NullPointerException: null
> at
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
> at
> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> Thanks