You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by sandesh deshmane <sa...@gmail.com> on 2016/06/09 10:57:38 UTC

Error while using checkpointing . Spark streaming 1.5.2- DStream checkpointing has been enabled but the DStreams with their functions are not serialisable

Hi,

I am using spark streaming for streaming data from kafka 0.8

I am using checkpointing in HDFS . I am getting error like below

java.io.NotSerializableException: DStream checkpointing has been enabled
but the DStreams with their functions are not serialisable

field (class:
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
name: foreachFunc$1, type: interface
org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
<function1>)
- field (class:
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
name: cleanedF$1, type: interface scala.Function1)
- object (class
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
<function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream,
org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files

])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files

])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files


In my foreachRDD for dstream I use code like below

foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {

private static final long serialVersionUID = 1L;

@Override
public Void call(JavaPairRDD<String, String> v1)
throws Exception {
Map<String, String> localMap = v1.collectAsMap();
myfunction(localmap)

};

my function writes content of local map to file.


Thanks
Sandesh

Re: Error while using checkpointing . Spark streaming 1.5.2- DStream checkpointing has been enabled but the DStreams with their functions are not serialisable

Posted by sandesh deshmane <sa...@gmail.com>.
Hi

changed code like this but error continues:

myUnionRdd.repartition(sparkNumberOfSlaves).foreachRDD(

new Function<JavaPairRDD<String, String>, Void>() {


private static final long serialVersionUID = 1L;


@Override

public Void call(JavaPairRDD<String, String> v1)

throws Exception {


Map<String, String> localMap = v1.collectAsMap();

long currentTime=System.currentTimeMillis();

String oldfileName="/mnt/intermediate_data/output_"+currentTime+".part";

String fileName="/mnt/intermediate_data/output_"+currentTime+".csv";

PrintWriter writer=null;

try{

 writer = new PrintWriter(oldfileName, "UTF-8");

for (Map.Entry<String, String> entry : localMap

.entrySet()) {

String key = entry.getKey();

String value = entry.getValue();

writer.println(key+","+value);

}

}catch(Exception e)

{

e.printStackTrace();

}

finally

{

if(null!=writer)

writer.close();

File part= new File(oldfileName);

File csv= new File(fileName);

part.renameTo(csv);

}

return null;

}

});

On Thu, Jun 9, 2016 at 11:51 PM, Tathagata Das <ta...@gmail.com>
wrote:

> myFunction() is probably capturing unexpected things in the closure of the
> Function you have defined, because myFunction is defined outside. Try
> defining the myFunction inside the Function and see if the problem persists.
>
> On Thu, Jun 9, 2016 at 3:57 AM, sandesh deshmane <sa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am using spark streaming for streaming data from kafka 0.8
>>
>> I am using checkpointing in HDFS . I am getting error like below
>>
>> java.io.NotSerializableException: DStream checkpointing has been enabled
>> but the DStreams with their functions are not serialisable
>>
>> field (class:
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
>> name: foreachFunc$1, type: interface
>> org.apache.spark.api.java.function.Function)
>> - object (class
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
>> <function1>)
>> - field (class:
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
>> name: cleanedF$1, type: interface scala.Function1)
>> - object (class
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
>> <function2>)
>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>> - object (class org.apache.spark.streaming.dstream.ForEachDStream,
>> org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
>> - element of array (index: 0)
>> - array (class [Ljava.lang.Object;, size 16)
>> - field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
>> class [Ljava.lang.Object;)
>> - object (class scala.collection.mutable.ArrayBuffer,
>> ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
>> - writeObject data (class:
>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
>> [
>> 0 checkpoint files
>>
>> ])
>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
>> org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
>> - writeObject data (class:
>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
>> [
>> 0 checkpoint files
>>
>> ])
>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
>> org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
>> - writeObject data (class:
>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
>> [
>> 0 checkpoint files
>>
>>
>> In my foreachRDD for dstream I use code like below
>>
>> foreachRDD(
>> new Function<JavaPairRDD<String, String>, Void>() {
>>
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>> public Void call(JavaPairRDD<String, String> v1)
>> throws Exception {
>> Map<String, String> localMap = v1.collectAsMap();
>> myfunction(localmap)
>>
>> };
>>
>> my function writes content of local map to file.
>>
>>
>> Thanks
>> Sandesh
>>
>
>

Re: Error while using checkpointing . Spark streaming 1.5.2- DStream checkpointing has been enabled but the DStreams with their functions are not serialisable

Posted by Tathagata Das <ta...@gmail.com>.
myFunction() is probably capturing unexpected things in the closure of the
Function you have defined, because myFunction is defined outside. Try
defining the myFunction inside the Function and see if the problem persists.

On Thu, Jun 9, 2016 at 3:57 AM, sandesh deshmane <sa...@gmail.com>
wrote:

> Hi,
>
> I am using spark streaming for streaming data from kafka 0.8
>
> I am using checkpointing in HDFS . I am getting error like below
>
> java.io.NotSerializableException: DStream checkpointing has been enabled
> but the DStreams with their functions are not serialisable
>
> field (class:
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
> name: foreachFunc$1, type: interface
> org.apache.spark.api.java.function.Function)
> - object (class
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
> <function1>)
> - field (class:
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
> name: cleanedF$1, type: interface scala.Function1)
> - object (class
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
> <function2>)
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.dstream.ForEachDStream,
> org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 16)
> - field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
> class [Ljava.lang.Object;)
> - object (class scala.collection.mutable.ArrayBuffer,
> ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
> - writeObject data (class:
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
>
> ])
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
> org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
> - writeObject data (class:
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
>
> ])
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
> org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
> - writeObject data (class:
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
>
>
> In my foreachRDD for dstream I use code like below
>
> foreachRDD(
> new Function<JavaPairRDD<String, String>, Void>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> public Void call(JavaPairRDD<String, String> v1)
> throws Exception {
> Map<String, String> localMap = v1.collectAsMap();
> myfunction(localmap)
>
> };
>
> my function writes content of local map to file.
>
>
> Thanks
> Sandesh
>