You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Bob Lau (JIRA)" <ji...@apache.org> on 2018/04/26 01:46:00 UTC

[jira] [Created] (FLINK-9259) The implementation of the SourceFunction is not serializable.

Bob Lau created FLINK-9259:
------------------------------

             Summary: The implementation of the SourceFunction is not serializable. 
                 Key: FLINK-9259
                 URL: https://issues.apache.org/jira/browse/FLINK-9259
             Project: Flink
          Issue Type: Bug
          Components: DataStream API, Table API &amp; SQL
    Affects Versions: 1.5.0
            Reporter: Bob Lau


The exception stack is as follows:
{code:java}
//代码占位符
org.apache.flink.api.common.InvalidProgramException: The implementation of the SourceFunction is not serializable. The object probably contains or references non serializable fields.

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416)

at com.xxxx.tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40)

at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95)

at org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)

at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)

at org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)

at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)

at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)

at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)

at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782)

at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)

at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)

at com.xxxx.tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338)

at com.xxxx.tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)

... 21 more
{code}
I've implement the serializable interface in the implementation of the SourceFunction.

The code is as follows:

 
{code:java}
//代码占位符
@Override

public void run(SourceContext<Row> ctx)

throws Exception {


stream.map(new MapFunction<Row, Row>(){

/**  */

private static final long serialVersionUID = -1723722950731109198L;



@Override

public Row map(Row input) throws Exception {

ctx.collect(input);

return null;

}

});

}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)