You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by backtrack5 <so...@live.com> on 2016/10/06 15:26:15 UTC

spark stateful streaming error

I am using pyspark stateful stream (2.0), which receives JSON from Socket. I
am getting the following error, When i send more then one records. meaning
if i send only one message i am getting response. If i send more than one
message getting following error,

def createmd5Hash(po):
    data = json.loads(po)
    return(hashlib.md5(data['somevalue'].encode('utf-8')).hexdigest(),data)
Implementation 1:

stream = ssc.socketTextStream("ip",  3341)
ssc.checkpoint('E:\\Work\\Python1\\work\\spark\\checkpoint\\')
initialStateRDD = sc.parallelize([(u'na', 1)])
with_hash = stream.map(lambda po : createmd5Hash(po)).reduceByKey(lambda
s1,s2:s1)
Implementation 2:

stream = ssc.textFileStream('C:\\sparkpoc\\input')
ssc.checkpoint('E:\\Work\\Python1\\work\\spark\\checkpoint\\')
initialStateRDD = sc.parallelize([(u'na', 1)])
with_hash = stream.map(lambda po : createmd5Hash(po)).reduceByKey(lambda
s1,s2:s1)


To be specific i am getting expected result when i read json from File
system textFileStream. But getting follwoing error when i use the socket
stream socketTextStream

16/10/06 20:50:42 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
"E:\Work\spark\installtion\spark\python\lib\pyspark.zip\pyspark\worker.py",
line 172, in main
  File
"E:\Work\spark\installtion\spark\python\lib\pyspark.zip\pyspark\worker.py",
line 167, in process
  File "E:\Work\spark\installtion\spark\python\pyspark\rdd.py", line 2371,
in pipeline_func
    return func(split, prev_func(split, iterator))
  File "E:\Work\spark\installtion\spark\python\pyspark\rdd.py", line 2371,
in pipeline_func
    return func(split, prev_func(split, iterator))
  File "E:\Work\spark\installtion\spark\python\pyspark\rdd.py", line 317, in
func
    return f(iterator)
  File "E:\Work\spark\installtion\spark\python\pyspark\rdd.py", line 1792,
in combineLocally
    merger.mergeValues(iterator)
  File
"E:\Work\spark\installtion\spark\python\lib\pyspark.zip\pyspark\shuffle.py",
line 236, in mergeValues
    for k, v in iterator:
  File "E:/Work/Python1/work/spark/streamexample.py", line 159, in <lambda>
    with_hash = stream.map(lambda po : createmd5Hash(po)).reduceByKey(lambda
s1,s2:s1)
  File "E:/Work/Python1/work/spark/streamexample.py", line 31, in
createmd5Hash
    data = json.loads(input_line)
  File "C:\Python34\lib\json\__init__.py", line 318, in loads
    return _default_decoder.decode(s)
  File "C:\Python34\lib\json\decoder.py", line 343, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "C:\Python34\lib\json\decoder.py", line 361, in raw_decode
    raise ValueError(errmsg("Expecting value", s, err.value)) from None
ValueError: Expecting value: line 1 column 1 (char 0)

    at
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at
org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Can someone please help me ?
http://stackoverflow.com/questions/39897475/spark-stateful-streaming-error





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-stateful-streaming-error-tp27851.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org