You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lk_spark <lk...@163.com> on 2017/02/27 03:14:00 UTC
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
hi,all:
I want to extract some info from kafka useing sparkstream,my code like :
val keyword = ""
val system = "dmp"
val datetime_idx = 0
val datetime_length = 23
val logLevelBeginIdx = datetime_length + 2 - 1
val logLevelMaxLenght = 5
val lines = messages.filter(record => record.value().matches("\\d{4}.*")).map(record => {
val assembly = record.topic()
val value = record.value
val datatime = value.substring(datetime_idx, datetime_length - 1)
val level = value.substring(logLevelBeginIdx, logLevelBeginIdx + logLevelMaxLenght - 1)
(assembly,value,datatime,level)
})
I will get error :
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@5a457aa1)
- field (class: $iw, name: streamingContext, type: class org.apache.spark.streaming.StreamingContext)
- object (class $iw, $iw@38eb2140)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2a3ced3d)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7c5dbca5)
....
==================================================================================
if I change the parameter to constant I will not got error :
val lines = messages.filter(record => record.value().matches("\\d{4}.*")).map(record => {
val assembly = record.topic()
val value = record.value
val datatime = value.substring(0, 22)
val level = value.substring(24, 27)
(assembly,value,datatime,level)
})
how can I pass parameter to the map function.
2017-02-27
lk_spark