You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by subash basnet <ya...@gmail.com> on 2016/04/19 14:32:56 UTC

adding source not serializable exception in streaming implementation

Hello all,

My requirement is to re-read the csv file from a file path at certain time
intervals and process the csv data. The csv file gets updated at regular
intervals.
Below is my code:
StreamExecutionEnvironment see =
StreamExecutionEnvironment.getExecutionEnvironment();
*DataStream<String> dataStream = getCsvDataStream(see);*
DataStream<Stock> edits = see.addSource(new FetchStock("path/to/csv"));

In FetchStock.java
public class FetchStock extends RichSourceFunction<Stock> {
public FetchStock(String csvPath) {
this.csvPath = csvPath;
}
}

I am trying to adapt code from *WikipediaAnalysis, *but getting the below
not serializable exception on adding source:
*Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Object
wikiedits.FetchStock@d7b1517 not serializable*
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
at wikiedits.StockAnalysis.main(StockAnalysis.java:30)
*Caused by: java.io.NotSerializableException:
org.apache.flink.streaming.api.environment.LocalStreamEnvironment*
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 6 more


I have attached Stock.java which is just a model with getters and setters.
Not sure what am I doing wrong.

Best Regards,
Subash Basnet

Re: adding source not serializable exception in streaming implementation

Posted by Till Rohrmann <tr...@apache.org>.
I assume that the provided FetchStock code is not complete. As the
exception indicates, you somehow store a LocalStreamEnvironment in you
source function. The StreamExecutionEnvironments are not serializable and
cannot be part of the source function’s closure.

Cheers,
Till
​

On Tue, Apr 19, 2016 at 2:32 PM, subash basnet <ya...@gmail.com> wrote:

> Hello all,
>
> My requirement is to re-read the csv file from a file path at certain time
> intervals and process the csv data. The csv file gets updated at regular
> intervals.
> Below is my code:
> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> *DataStream<String> dataStream = getCsvDataStream(see);*
> DataStream<Stock> edits = see.addSource(new FetchStock("path/to/csv"));
>
> In FetchStock.java
> public class FetchStock extends RichSourceFunction<Stock> {
> public FetchStock(String csvPath) {
> this.csvPath = csvPath;
> }
> }
>
> I am trying to adapt code from *WikipediaAnalysis, *but getting the below
> not serializable exception on adding source:
> *Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Object
> wikiedits.FetchStock@d7b1517 not serializable*
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
> at wikiedits.StockAnalysis.main(StockAnalysis.java:30)
> *Caused by: java.io.NotSerializableException:
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment*
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> ... 6 more
>
>
> I have attached Stock.java which is just a model with getters and setters.
> Not sure what am I doing wrong.
>
> Best Regards,
> Subash Basnet
>