You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 萝卜丝炒饭 <14...@qq.com> on 2017/03/11 08:10:05 UTC

Re: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext

i think the val you defined are only valid in the driver, you can  try boardcast variable.

---Original---
From: "lk_spark"<lk...@163.com>
Date: 2017/2/27 11:14:23
To: "user.spark"<us...@spark.apache.org>;
Subject: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext


  hi,all:
        I want to extract some info from kafka  useing sparkstream,my code like :
        
     val keyword = ""
    val system =  "dmp"
    val datetime_idx = 0
    val  datetime_length = 23
    val logLevelBeginIdx =  datetime_length + 2 - 1
    val logLevelMaxLenght = 5
        
     val lines = messages.filter(record =>  record.value().matches("\\d{4}.*")).map(record =>  {
      val assembly =  record.topic()
      val value =  record.value
      val datatime =  value.substring(datetime_idx, datetime_length -  1)
      val level =  value.substring(logLevelBeginIdx, logLevelBeginIdx + logLevelMaxLenght -  1)
       (assembly,value,datatime,level)
    })
     
     I will get error :
     Caused by: java.io.NotSerializableException:  org.apache.spark.streaming.StreamingContext
Serialization stack:
 -  object not serializable (class: org.apache.spark.streaming.StreamingContext,  value: org.apache.spark.streaming.StreamingContext@5a457aa1)
 -  field (class: $iw, name: streamingContext, type: class  org.apache.spark.streaming.StreamingContext)
 - object (class $iw, $iw@38eb2140)
 - field (class: $iw, name:  $iw, type: class $iw)
 - object (class $iw, $iw@2a3ced3d)
 - field (class: $iw, name:  $iw, type: class $iw)
 - object (class $iw, $iw@7c5dbca5)
....
 ==================================================================================
   if I change the parameter to constant I will not got  error  :
   
   val lines = messages.filter(record =>  record.value().matches("\\d{4}.*")).map(record =>  {
      val assembly =  record.topic()
      val value =  record.value
      val datatime = value.substring(0,  22)
      val level = value.substring(24,  27)
       (assembly,value,datatime,level)
       
    })
  
 how can I pass parameter to the map function.
  
  2017-02-27
 
 lk_spark

Re: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext

Posted by Lysiane Bouchard <bo...@gmail.com>.
Hi,

The error message indicates that a Streaming Context object end up in the
fields of the closure that Spark tries to serialize.

Could you show us the enclosing function and component ?

The workarounds proposed in the following stack overflow reply might help
you to fix the problem:
  http://stackoverflow.com/a/30094847



On Sat, Mar 11, 2017 at 3:10 AM, 萝卜丝炒饭 <14...@qq.com> wrote:

> i think the val you defined are only valid in the driver,
> you can  try boardcast variable.
>
> ---Original---
> *From:* "lk_spark"<lk...@163.com>
> *Date:* 2017/2/27 11:14:23
> *To:* "user.spark"<us...@spark.apache.org>;
> *Subject:* java.io.NotSerializableException: org.apache.spark.streaming.
> StreamingContext
>
> hi,all:
>        I want to extract some info from kafka useing sparkstream,my code
> like :
>
>     val keyword = ""
>     val system = "dmp"
>     val datetime_idx = 0
>     val datetime_length = 23
>     val logLevelBeginIdx = datetime_length + 2 - 1
>     val logLevelMaxLenght = 5
>
>     val lines = messages.filter(record => record.value().matches("\\d{4}
> .*")).map(record => {
>       val assembly = record.topic()
>       val value = record.value
>       val datatime = value.substring(datetime_idx, datetime_length - 1)
>       val level = value.substring(logLevelBeginIdx, logLevelBeginIdx +
> logLevelMaxLenght - 1)
>       (assembly,value,datatime,level)
>     })
>
>     I will get error :
>     Caused by: java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
> Serialization stack:
>  - object not serializable (class: org.apache.spark.streaming.StreamingContext,
> value: org.apache.spark.streaming.StreamingContext@5a457aa1)
>  - field (class: $iw, name: streamingContext, type: class
> org.apache.spark.streaming.StreamingContext)
>  - object (class $iw, $iw@38eb2140)
>  - field (class: $iw, name: $iw, type: class $iw)
>  - object (class $iw, $iw@2a3ced3d)
>  - field (class: $iw, name: $iw, type: class $iw)
>  - object (class $iw, $iw@7c5dbca5)
> ....
> ============================================================
> ======================
>   if I change the parameter to constant I will not got error :
>
>   val lines = messages.filter(record => record.value().matches("\\d{4}
> .*")).map(record => {
>       val assembly = record.topic()
>       val value = record.value
>       val datatime = value.substring(0, 22)
>       val level = value.substring(24, 27)
>       (assembly,value,datatime,level)
>
>     })
>
> how can I pass parameter to the map function.
>
> 2017-02-27
> ------------------------------
> lk_spark
>