You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Naveen Dabas <na...@ymail.com> on 2015/07/15 16:40:23 UTC

Job aborted due to stage failure: Task not serializable:



  

 I am using the below code and using kryo serializer .when i run this code i got this error : Task not serializable in commented line2) how broadcast variables are treated in exceotu.are they local variables or can be used in any function defined as global variables.
object StreamingLogInput {  def main(args: Array[String]) {    val master = args(0)    val conf = new SparkConf().setAppName("StreamingLogInput")    // Create a StreamingContext with a 1 second batch size        val sc = new SparkContext(conf)    val lines=sc.parallelize(List("eoore is test","testing is error report"))    //val ssc = new StreamingContext(sc, Seconds(30))    //val lines = ssc.socketTextStream("localhost", 7777)    val filter=sc.textFile("/user/nadabas/filters/fltr")    val filarr=filter.collect().toArray    val broadcastVar = sc.broadcast(filarr)        // val out=lines.transform{rdd=>rdd.filter(x=>fil(broadcastVar.value,x))}    val out=lines.filter(x=>fil(broadcastVar.value,x))  //error is coming        out.collect()      }  def fil(x1:Array[String],y1:String)={    val y=y1 // val x=broadcastVar.value    val x=x1  var flag:Boolean=false     for(a<-x)  {    if(y.contains(a))    flag=true    }    flag    }   }

   

Re: Job aborted due to stage failure: Task not serializable:

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Did you try this?

*val out=lines.filter(xx=>{*

val y=xx
  val x=broadcastVar.value

  var flag:Boolean=false
     for(a<-x)
  {
    if(y.contains(a))
    flag=true
    }
    flag
    }


*})*


Thanks
Best Regards

On Wed, Jul 15, 2015 at 8:10 PM, Naveen Dabas <na...@ymail.com> wrote:

>
>
>
>
>
> I am using the below code and using kryo serializer .when i run this code
> i got this error : Task not serializable in commented line
> 2) how broadcast variables are treated in exceotu.are they local variables
> or can be used in any function defined as global variables.
>
> object StreamingLogInput {
>   def main(args: Array[String]) {
>     val master = args(0)
>     val conf = new SparkConf().setAppName("StreamingLogInput")
>     // Create a StreamingContext with a 1 second batch size
>
>     val sc = new SparkContext(conf)
>     val lines=sc.parallelize(List("eoore is test","testing is error
> report"))
>     //val ssc = new StreamingContext(sc, Seconds(30))
>     //val lines = ssc.socketTextStream("localhost", 7777)
>     val filter=sc.textFile("/user/nadabas/filters/fltr")
>     val filarr=filter.collect().toArray
>     val broadcastVar = sc.broadcast(filarr)
>
>     // val
> out=lines.transform{rdd=>rdd.filter(x=>fil(broadcastVar.value,x))}
>
> *val out=lines.filter(x=>fil(broadcastVar.value,x))  //error is coming*
>
>     out.collect()
>
>   }
>   def fil(x1:Array[String],y1:String)={
>     val y=y1
>  // val x=broadcastVar.value
>     val x=x1
>   var flag:Boolean=false
>      for(a<-x)
>   {
>     if(y.contains(a))
>     flag=true
>     }
>     flag
>     }
>    }
>
>
>