You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Naveen Dabas <na...@ymail.com> on 2015/07/15 16:40:23 UTC
Job aborted due to stage failure: Task not serializable:
I am using the below code and using kryo serializer .when i run this code i got this error : Task not serializable in commented line2) how broadcast variables are treated in exceotu.are they local variables or can be used in any function defined as global variables.
object StreamingLogInput { def main(args: Array[String]) { val master = args(0) val conf = new SparkConf().setAppName("StreamingLogInput") // Create a StreamingContext with a 1 second batch size val sc = new SparkContext(conf) val lines=sc.parallelize(List("eoore is test","testing is error report")) //val ssc = new StreamingContext(sc, Seconds(30)) //val lines = ssc.socketTextStream("localhost", 7777) val filter=sc.textFile("/user/nadabas/filters/fltr") val filarr=filter.collect().toArray val broadcastVar = sc.broadcast(filarr) // val out=lines.transform{rdd=>rdd.filter(x=>fil(broadcastVar.value,x))} val out=lines.filter(x=>fil(broadcastVar.value,x)) //error is coming out.collect() } def fil(x1:Array[String],y1:String)={ val y=y1 // val x=broadcastVar.value val x=x1 var flag:Boolean=false for(a<-x) { if(y.contains(a)) flag=true } flag } }
Re: Job aborted due to stage failure: Task not serializable:
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Did you try this?
*val out=lines.filter(xx=>{*
val y=xx
val x=broadcastVar.value
var flag:Boolean=false
for(a<-x)
{
if(y.contains(a))
flag=true
}
flag
}
*})*
Thanks
Best Regards
On Wed, Jul 15, 2015 at 8:10 PM, Naveen Dabas <na...@ymail.com> wrote:
>
>
>
>
>
> I am using the below code and using kryo serializer .when i run this code
> i got this error : Task not serializable in commented line
> 2) how broadcast variables are treated in exceotu.are they local variables
> or can be used in any function defined as global variables.
>
> object StreamingLogInput {
> def main(args: Array[String]) {
> val master = args(0)
> val conf = new SparkConf().setAppName("StreamingLogInput")
> // Create a StreamingContext with a 1 second batch size
>
> val sc = new SparkContext(conf)
> val lines=sc.parallelize(List("eoore is test","testing is error
> report"))
> //val ssc = new StreamingContext(sc, Seconds(30))
> //val lines = ssc.socketTextStream("localhost", 7777)
> val filter=sc.textFile("/user/nadabas/filters/fltr")
> val filarr=filter.collect().toArray
> val broadcastVar = sc.broadcast(filarr)
>
> // val
> out=lines.transform{rdd=>rdd.filter(x=>fil(broadcastVar.value,x))}
>
> *val out=lines.filter(x=>fil(broadcastVar.value,x)) //error is coming*
>
> out.collect()
>
> }
> def fil(x1:Array[String],y1:String)={
> val y=y1
> // val x=broadcastVar.value
> val x=x1
> var flag:Boolean=false
> for(a<-x)
> {
> if(y.contains(a))
> flag=true
> }
> flag
> }
> }
>
>
>