You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by sandesh deshmane <sa...@gmail.com> on 2016/06/09 10:57:38 UTC
Error while using checkpointing . Spark streaming 1.5.2- DStream
checkpointing has been enabled but the DStreams with their functions are not serialisable
Hi,
I am using spark streaming for streaming data from kafka 0.8
I am using checkpointing in HDFS . I am getting error like below
java.io.NotSerializableException: DStream checkpointing has been enabled
but the DStreams with their functions are not serialisable
field (class:
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
name: foreachFunc$1, type: interface
org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
<function1>)
- field (class:
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
name: cleanedF$1, type: interface scala.Function1)
- object (class
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
<function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream,
org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
In my foreachRDD for dstream I use code like below
foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {
private static final long serialVersionUID = 1L;
@Override
public Void call(JavaPairRDD<String, String> v1)
throws Exception {
Map<String, String> localMap = v1.collectAsMap();
myfunction(localmap)
};
my function writes content of local map to file.
Thanks
Sandesh
Re: Error while using checkpointing . Spark streaming 1.5.2- DStream
checkpointing has been enabled but the DStreams with their functions are not serialisable
Posted by sandesh deshmane <sa...@gmail.com>.
Hi
changed code like this but error continues:
myUnionRdd.repartition(sparkNumberOfSlaves).foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {
private static final long serialVersionUID = 1L;
@Override
public Void call(JavaPairRDD<String, String> v1)
throws Exception {
Map<String, String> localMap = v1.collectAsMap();
long currentTime=System.currentTimeMillis();
String oldfileName="/mnt/intermediate_data/output_"+currentTime+".part";
String fileName="/mnt/intermediate_data/output_"+currentTime+".csv";
PrintWriter writer=null;
try{
writer = new PrintWriter(oldfileName, "UTF-8");
for (Map.Entry<String, String> entry : localMap
.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
writer.println(key+","+value);
}
}catch(Exception e)
{
e.printStackTrace();
}
finally
{
if(null!=writer)
writer.close();
File part= new File(oldfileName);
File csv= new File(fileName);
part.renameTo(csv);
}
return null;
}
});
On Thu, Jun 9, 2016 at 11:51 PM, Tathagata Das <ta...@gmail.com>
wrote:
> myFunction() is probably capturing unexpected things in the closure of the
> Function you have defined, because myFunction is defined outside. Try
> defining the myFunction inside the Function and see if the problem persists.
>
> On Thu, Jun 9, 2016 at 3:57 AM, sandesh deshmane <sa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am using spark streaming for streaming data from kafka 0.8
>>
>> I am using checkpointing in HDFS . I am getting error like below
>>
>> java.io.NotSerializableException: DStream checkpointing has been enabled
>> but the DStreams with their functions are not serialisable
>>
>> field (class:
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
>> name: foreachFunc$1, type: interface
>> org.apache.spark.api.java.function.Function)
>> - object (class
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
>> <function1>)
>> - field (class:
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
>> name: cleanedF$1, type: interface scala.Function1)
>> - object (class
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
>> <function2>)
>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>> - object (class org.apache.spark.streaming.dstream.ForEachDStream,
>> org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
>> - element of array (index: 0)
>> - array (class [Ljava.lang.Object;, size 16)
>> - field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
>> class [Ljava.lang.Object;)
>> - object (class scala.collection.mutable.ArrayBuffer,
>> ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
>> - writeObject data (class:
>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
>> [
>> 0 checkpoint files
>>
>> ])
>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
>> org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
>> - writeObject data (class:
>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
>> [
>> 0 checkpoint files
>>
>> ])
>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
>> org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
>> - writeObject data (class:
>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
>> [
>> 0 checkpoint files
>>
>>
>> In my foreachRDD for dstream I use code like below
>>
>> foreachRDD(
>> new Function<JavaPairRDD<String, String>, Void>() {
>>
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>> public Void call(JavaPairRDD<String, String> v1)
>> throws Exception {
>> Map<String, String> localMap = v1.collectAsMap();
>> myfunction(localmap)
>>
>> };
>>
>> my function writes content of local map to file.
>>
>>
>> Thanks
>> Sandesh
>>
>
>
Re: Error while using checkpointing . Spark streaming 1.5.2- DStream
checkpointing has been enabled but the DStreams with their functions are not serialisable
Posted by Tathagata Das <ta...@gmail.com>.
myFunction() is probably capturing unexpected things in the closure of the
Function you have defined, because myFunction is defined outside. Try
defining the myFunction inside the Function and see if the problem persists.
On Thu, Jun 9, 2016 at 3:57 AM, sandesh deshmane <sa...@gmail.com>
wrote:
> Hi,
>
> I am using spark streaming for streaming data from kafka 0.8
>
> I am using checkpointing in HDFS . I am getting error like below
>
> java.io.NotSerializableException: DStream checkpointing has been enabled
> but the DStreams with their functions are not serialisable
>
> field (class:
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
> name: foreachFunc$1, type: interface
> org.apache.spark.api.java.function.Function)
> - object (class
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
> <function1>)
> - field (class:
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
> name: cleanedF$1, type: interface scala.Function1)
> - object (class
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
> <function2>)
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.dstream.ForEachDStream,
> org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 16)
> - field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
> class [Ljava.lang.Object;)
> - object (class scala.collection.mutable.ArrayBuffer,
> ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
> - writeObject data (class:
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
>
> ])
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
> org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
> - writeObject data (class:
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
>
> ])
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
> org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
> - writeObject data (class:
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
>
>
> In my foreachRDD for dstream I use code like below
>
> foreachRDD(
> new Function<JavaPairRDD<String, String>, Void>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> public Void call(JavaPairRDD<String, String> v1)
> throws Exception {
> Map<String, String> localMap = v1.collectAsMap();
> myfunction(localmap)
>
> };
>
> my function writes content of local map to file.
>
>
> Thanks
> Sandesh
>