You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ajay <aj...@gmail.com> on 2015/07/20 12:26:20 UTC
PySpark Nested Json Parsing
Hi,
I am new to Apache Spark. I am trying to parse nested json using pyspark.
Here is the code by which I am trying to parse Json.
I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.
lines = sc.textFile(inputFile)
import json
def func(x):
json_str = json.loads(x)
if json_str['label']:
if json_str['label']['label2']:
return (1,1)
return (0,1)
lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)
I am getting following error,
ERROR [Executor task launch worker-13] executor.Executor
(Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
process()
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
return f(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
merger.mergeValues(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
for k, v in iterator:
File "<stdin>", line 2, in func
File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
obj, end = self._scanner.iterscan(s, **kw).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 855 (char 855)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
stage 14.0 (TID 24)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
process()
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
return f(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
merger.mergeValues(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
for k, v in iterator:
File "<stdin>", line 2, in func
File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
obj, end = self._scanner.iterscan(s, **kw).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 734 (char 734)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-07-20 09:58:24,730 WARN [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 1.0 in
stage 14.0 (TID 25, localhost):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
process()
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
return f(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
merger.mergeValues(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
for k, v in iterator:
File "<stdin>", line 2, in func
File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
obj, end = self._scanner.iterscan(s, **kw).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 855 (char 855)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-07-20 09:58:24,731 ERROR [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 1 in stage
14.0 failed 1 times; aborting job
2015-07-20 09:58:24,731 INFO [task-result-getter-0]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
14.0, whose tasks have all completed, from pool
Traceback (most recent call last):
2015-07-20 09:58:24,732 INFO [sparkDriver-akka.actor.default-dispatcher-2]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage
14
File "<stdin>", line 1, in <module>
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1288, in saveAsTextFile
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol2015-07-20 09:58:24,732 WARN [task-result-getter-1]
scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 0.0 in
stage 14.0 (TID 24, localhost):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
.Py4JJavaError File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
process()
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
return f(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
merger.mergeValues(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
for k, v in iterator:
File "<stdin>", line 2, in func
File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
obj, end = self._scanner.iterscan(s, **kw).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 734 (char 734)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-07-20 09:58:24,732 INFO [Thread-2] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 8 failed: saveAsTextFile at
NativeMethodAccessorImpl.java:-2, took 0.090286 s
2015-07-20 09:58:24,733 INFO [task-result-getter-1]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
14.0, whose tasks have all completed, from pool
: An error occurred while calling o276.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage
14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
Traceback (most recent call last):
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
process()
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
return f(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
merger.mergeValues(iterator)
File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
for k, v in iterator:
File "<stdin>", line 2, in func
File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
obj, end = self._scanner.iterscan(s, **kw).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
rval, next_pos = action(m, context)
File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 855 (char 855)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
What I am doing wrong. Please guide.
*Ajay Dubey*
Re: PySpark Nested Json Parsing
Posted by Naveen Madhire <vm...@umail.iu.edu>.
I had the similar issue with spark 1.3
After migrating to Spark 1.4 and using sqlcontext.read.json it worked well
I think you can look at dataframe select and explode options to read the
nested json elements, array etc.
Thanks.
On Mon, Jul 20, 2015 at 11:07 AM, Davies Liu <da...@databricks.com> wrote:
> Could you try SQLContext.read.json()?
>
> On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu <da...@databricks.com> wrote:
> > Before using the json file as text file, can you make sure that each
> > json string can fit in one line? Because textFile() will split the
> > file by '\n'
> >
> > On Mon, Jul 20, 2015 at 3:26 AM, Ajay <aj...@gmail.com> wrote:
> >> Hi,
> >>
> >> I am new to Apache Spark. I am trying to parse nested json using
> pyspark.
> >> Here is the code by which I am trying to parse Json.
> >> I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.
> >>
> >> lines = sc.textFile(inputFile)
> >>
> >> import json
> >> def func(x):
> >> json_str = json.loads(x)
> >> if json_str['label']:
> >> if json_str['label']['label2']:
> >> return (1,1)
> >> return (0,1)
> >>
> >> lines.map(func).reduceByKey(lambda a,b: a +
> b).saveAsTextFile(outputFile)
> >>
> >> I am getting following error,
> >> ERROR [Executor task launch worker-13] executor.Executor
> >> (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID
> 25)
> >> org.apache.spark.api.python.PythonException: Traceback (most recent call
> >> last):
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >> process()
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >> serializer.dump_stream(func(split_index, iterator), outfile)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >> return f(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >> merger.mergeValues(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >> for k, v in iterator:
> >> File "<stdin>", line 2, in func
> >> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >> return _default_decoder.decode(s)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >> obj, end = self._scanner.iterscan(s, **kw).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >> return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 855 (char 855)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >> 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
> >> executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0
> in
> >> stage 14.0 (TID 24)
> >> org.apache.spark.api.python.PythonException: Traceback (most recent call
> >> last):
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >> process()
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >> serializer.dump_stream(func(split_index, iterator), outfile)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >> return f(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >> merger.mergeValues(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >> for k, v in iterator:
> >> File "<stdin>", line 2, in func
> >> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >> return _default_decoder.decode(s)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >> obj, end = self._scanner.iterscan(s, **kw).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >> return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 734 (char 734)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >> 2015-07-20 09:58:24,730 WARN [task-result-getter-0]
> >> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 1.0
> in
> >> stage 14.0 (TID 25, localhost):
> org.apache.spark.api.python.PythonException:
> >> Traceback (most recent call last):
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >> process()
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >> serializer.dump_stream(func(split_index, iterator), outfile)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >> return f(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >> merger.mergeValues(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >> for k, v in iterator:
> >> File "<stdin>", line 2, in func
> >> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >> return _default_decoder.decode(s)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >> obj, end = self._scanner.iterscan(s, **kw).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >> return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 855 (char 855)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> 2015-07-20 09:58:24,731 ERROR [task-result-getter-0]
> >> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 1 in stage
> 14.0
> >> failed 1 times; aborting job
> >> 2015-07-20 09:58:24,731 INFO [task-result-getter-0]
> >> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed
> TaskSet
> >> 14.0, whose tasks have all completed, from pool
> >> Traceback (most recent call last):
> >> 2015-07-20 09:58:24,732 INFO
> [sparkDriver-akka.actor.default-dispatcher-2]
> >> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling
> stage
> >> 14
> >> File "<stdin>", line 1, in <module>
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1288, in saveAsTextFile
> >> keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> >> line 538, in __call__
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> >> line 300, in get_return_value
> >> py4j.protocol2015-07-20 09:58:24,732 WARN [task-result-getter-1]
> >> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 0.0
> in
> >> stage 14.0 (TID 24, localhost):
> org.apache.spark.api.python.PythonException:
> >> Traceback (most recent call last):
> >> .Py4JJavaError File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >> process()
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >> serializer.dump_stream(func(split_index, iterator), outfile)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >> return f(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >> merger.mergeValues(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >> for k, v in iterator:
> >> File "<stdin>", line 2, in func
> >> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >> return _default_decoder.decode(s)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >> obj, end = self._scanner.iterscan(s, **kw).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >> return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 734 (char 734)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> 2015-07-20 09:58:24,732 INFO [Thread-2] scheduler.DAGScheduler
> >> (Logging.scala:logInfo(59)) - Job 8 failed: saveAsTextFile at
> >> NativeMethodAccessorImpl.java:-2, took 0.090286 s
> >> 2015-07-20 09:58:24,733 INFO [task-result-getter-1]
> >> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed
> TaskSet
> >> 14.0, whose tasks have all completed, from pool
> >> : An error occurred while calling o276.saveAsTextFile.
> >> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 1
> >> in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in
> stage
> >> 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
> >> Traceback (most recent call last):
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >> process()
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >> serializer.dump_stream(func(split_index, iterator), outfile)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >> return func(split, prev_func(split, iterator))
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >> return f(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >> merger.mergeValues(iterator)
> >> File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >> for k, v in iterator:
> >> File "<stdin>", line 2, in func
> >> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >> return _default_decoder.decode(s)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >> obj, end = self._scanner.iterscan(s, **kw).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >> value, end = iterscan(s, idx=end, context=context).next()
> >> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >> rval, next_pos = action(m, context)
> >> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >> return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 855 (char 855)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Driver stacktrace:
> >> at
> >> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> >> at
> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> >> at scala.Option.foreach(Option.scala:236)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> >> at
> >>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >> at
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>
> >> What I am doing wrong. Please guide.
> >>
> >> Ajay Dubey
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
Re: PySpark Nested Json Parsing
Posted by Davies Liu <da...@databricks.com>.
Could you try SQLContext.read.json()?
On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu <da...@databricks.com> wrote:
> Before using the json file as text file, can you make sure that each
> json string can fit in one line? Because textFile() will split the
> file by '\n'
>
> On Mon, Jul 20, 2015 at 3:26 AM, Ajay <aj...@gmail.com> wrote:
>> Hi,
>>
>> I am new to Apache Spark. I am trying to parse nested json using pyspark.
>> Here is the code by which I am trying to parse Json.
>> I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.
>>
>> lines = sc.textFile(inputFile)
>>
>> import json
>> def func(x):
>> json_str = json.loads(x)
>> if json_str['label']:
>> if json_str['label']['label2']:
>> return (1,1)
>> return (0,1)
>>
>> lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)
>>
>> I am getting following error,
>> ERROR [Executor task launch worker-13] executor.Executor
>> (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>> process()
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>> return f(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>> merger.mergeValues(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>> for k, v in iterator:
>> File "<stdin>", line 2, in func
>> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>> return _default_decoder.decode(s)
>> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>> obj, end = self._scanner.iterscan(s, **kw).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>> return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 855 (char 855)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
>> executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
>> stage 14.0 (TID 24)
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>> process()
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>> return f(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>> merger.mergeValues(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>> for k, v in iterator:
>> File "<stdin>", line 2, in func
>> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>> return _default_decoder.decode(s)
>> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>> obj, end = self._scanner.iterscan(s, **kw).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>> return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 734 (char 734)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 2015-07-20 09:58:24,730 WARN [task-result-getter-0]
>> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 1.0 in
>> stage 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
>> Traceback (most recent call last):
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>> process()
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>> return f(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>> merger.mergeValues(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>> for k, v in iterator:
>> File "<stdin>", line 2, in func
>> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>> return _default_decoder.decode(s)
>> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>> obj, end = self._scanner.iterscan(s, **kw).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>> return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 855 (char 855)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2015-07-20 09:58:24,731 ERROR [task-result-getter-0]
>> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 1 in stage 14.0
>> failed 1 times; aborting job
>> 2015-07-20 09:58:24,731 INFO [task-result-getter-0]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
>> 14.0, whose tasks have all completed, from pool
>> Traceback (most recent call last):
>> 2015-07-20 09:58:24,732 INFO [sparkDriver-akka.actor.default-dispatcher-2]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage
>> 14
>> File "<stdin>", line 1, in <module>
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1288, in saveAsTextFile
>> keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> py4j.protocol2015-07-20 09:58:24,732 WARN [task-result-getter-1]
>> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 0.0 in
>> stage 14.0 (TID 24, localhost): org.apache.spark.api.python.PythonException:
>> Traceback (most recent call last):
>> .Py4JJavaError File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>> process()
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>> return f(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>> merger.mergeValues(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>> for k, v in iterator:
>> File "<stdin>", line 2, in func
>> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>> return _default_decoder.decode(s)
>> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>> obj, end = self._scanner.iterscan(s, **kw).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>> return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 734 (char 734)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2015-07-20 09:58:24,732 INFO [Thread-2] scheduler.DAGScheduler
>> (Logging.scala:logInfo(59)) - Job 8 failed: saveAsTextFile at
>> NativeMethodAccessorImpl.java:-2, took 0.090286 s
>> 2015-07-20 09:58:24,733 INFO [task-result-getter-1]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
>> 14.0, whose tasks have all completed, from pool
>> : An error occurred while calling o276.saveAsTextFile.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
>> in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage
>> 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
>> Traceback (most recent call last):
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>> process()
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>> return func(split, prev_func(split, iterator))
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>> return f(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>> merger.mergeValues(iterator)
>> File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>> for k, v in iterator:
>> File "<stdin>", line 2, in func
>> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>> return _default_decoder.decode(s)
>> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>> obj, end = self._scanner.iterscan(s, **kw).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>> value, end = iterscan(s, idx=end, context=context).next()
>> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>> rval, next_pos = action(m, context)
>> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>> return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 855 (char 855)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> What I am doing wrong. Please guide.
>>
>> Ajay Dubey
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: PySpark Nested Json Parsing
Posted by Davies Liu <da...@databricks.com>.
Before using the json file as text file, can you make sure that each
json string can fit in one line? Because textFile() will split the
file by '\n'
On Mon, Jul 20, 2015 at 3:26 AM, Ajay <aj...@gmail.com> wrote:
> Hi,
>
> I am new to Apache Spark. I am trying to parse nested json using pyspark.
> Here is the code by which I am trying to parse Json.
> I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.
>
> lines = sc.textFile(inputFile)
>
> import json
> def func(x):
> json_str = json.loads(x)
> if json_str['label']:
> if json_str['label']['label2']:
> return (1,1)
> return (0,1)
>
> lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)
>
> I am getting following error,
> ERROR [Executor task launch worker-13] executor.Executor
> (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
> process()
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
> return f(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
> merger.mergeValues(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
> for k, v in iterator:
> File "<stdin>", line 2, in func
> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> return _default_decoder.decode(s)
> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> obj, end = self._scanner.iterscan(s, **kw).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 855 (char 855)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
> executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
> stage 14.0 (TID 24)
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
> process()
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
> return f(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
> merger.mergeValues(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
> for k, v in iterator:
> File "<stdin>", line 2, in func
> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> return _default_decoder.decode(s)
> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> obj, end = self._scanner.iterscan(s, **kw).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 734 (char 734)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 2015-07-20 09:58:24,730 WARN [task-result-getter-0]
> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 1.0 in
> stage 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
> process()
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
> return f(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
> merger.mergeValues(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
> for k, v in iterator:
> File "<stdin>", line 2, in func
> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> return _default_decoder.decode(s)
> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> obj, end = self._scanner.iterscan(s, **kw).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 855 (char 855)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 2015-07-20 09:58:24,731 ERROR [task-result-getter-0]
> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 1 in stage 14.0
> failed 1 times; aborting job
> 2015-07-20 09:58:24,731 INFO [task-result-getter-0]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
> 14.0, whose tasks have all completed, from pool
> Traceback (most recent call last):
> 2015-07-20 09:58:24,732 INFO [sparkDriver-akka.actor.default-dispatcher-2]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage
> 14
> File "<stdin>", line 1, in <module>
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1288, in saveAsTextFile
> keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol2015-07-20 09:58:24,732 WARN [task-result-getter-1]
> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 0.0 in
> stage 14.0 (TID 24, localhost): org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):
> .Py4JJavaError File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
> process()
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
> return f(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
> merger.mergeValues(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
> for k, v in iterator:
> File "<stdin>", line 2, in func
> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> return _default_decoder.decode(s)
> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> obj, end = self._scanner.iterscan(s, **kw).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 734 (char 734)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 2015-07-20 09:58:24,732 INFO [Thread-2] scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Job 8 failed: saveAsTextFile at
> NativeMethodAccessorImpl.java:-2, took 0.090286 s
> 2015-07-20 09:58:24,733 INFO [task-result-getter-1]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
> 14.0, whose tasks have all completed, from pool
> : An error occurred while calling o276.saveAsTextFile.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage
> 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
> process()
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
> return func(split, prev_func(split, iterator))
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
> return f(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
> merger.mergeValues(iterator)
> File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
> for k, v in iterator:
> File "<stdin>", line 2, in func
> File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> return _default_decoder.decode(s)
> File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> obj, end = self._scanner.iterscan(s, **kw).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> value, end = iterscan(s, idx=end, context=context).next()
> File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> rval, next_pos = action(m, context)
> File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 855 (char 855)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> What I am doing wrong. Please guide.
>
> Ajay Dubey
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org