You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lk_spark <lk...@163.com> on 2017/02/27 03:14:00 UTC

java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext

hi,all:
       I want to extract some info from kafka useing sparkstream,my code like :
       
    val keyword = ""
    val system = "dmp"
    val datetime_idx = 0
    val datetime_length = 23
    val logLevelBeginIdx = datetime_length + 2 - 1
    val logLevelMaxLenght = 5
       
    val lines = messages.filter(record => record.value().matches("\\d{4}.*")).map(record => {
      val assembly = record.topic()
      val value = record.value
      val datatime = value.substring(datetime_idx, datetime_length - 1)
      val level = value.substring(logLevelBeginIdx, logLevelBeginIdx + logLevelMaxLenght - 1)
      (assembly,value,datatime,level)
    })
    
    I will get error :
    Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
Serialization stack:
 - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@5a457aa1)
 - field (class: $iw, name: streamingContext, type: class org.apache.spark.streaming.StreamingContext)
 - object (class $iw, $iw@38eb2140)
 - field (class: $iw, name: $iw, type: class $iw)
 - object (class $iw, $iw@2a3ced3d)
 - field (class: $iw, name: $iw, type: class $iw)
 - object (class $iw, $iw@7c5dbca5)
....
==================================================================================
  if I change the parameter to constant I will not got error  :
  
  val lines = messages.filter(record => record.value().matches("\\d{4}.*")).map(record => {
      val assembly = record.topic()
      val value = record.value
      val datatime = value.substring(0, 22)
      val level = value.substring(24, 27)
      (assembly,value,datatime,level)
      
    })

how can I pass parameter to the map function.

2017-02-27


lk_spark