You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ajay <aj...@gmail.com> on 2015/07/20 12:26:20 UTC

PySpark Nested Json Parsing

Hi,

I am new to Apache Spark. I am trying to parse nested json using pyspark.
Here is the code by which I am trying to parse Json.
I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.

lines = sc.textFile(inputFile)

import json
def func(x):
json_str = json.loads(x)
if json_str['label']:
                if json_str['label']['label2']:
return (1,1)
return (0,1)

lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)

I am getting following error,
ERROR [Executor task launch worker-13] executor.Executor
(Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
    process()
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
    return f(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
    merger.mergeValues(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
    for k, v in iterator:
  File "<stdin>", line 2, in func
  File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
    obj, end = self._scanner.iterscan(s, **kw).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
    return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 855 (char 855)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
stage 14.0 (TID 24)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
    process()
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
    return f(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
    merger.mergeValues(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
    for k, v in iterator:
  File "<stdin>", line 2, in func
  File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
    obj, end = self._scanner.iterscan(s, **kw).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
    return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 734 (char 734)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-07-20 09:58:24,730 WARN  [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 1.0 in
stage 14.0 (TID 25, localhost):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
    process()
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
    return f(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
    merger.mergeValues(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
    for k, v in iterator:
  File "<stdin>", line 2, in func
  File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
    obj, end = self._scanner.iterscan(s, **kw).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
    return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 855 (char 855)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

2015-07-20 09:58:24,731 ERROR [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 1 in stage
14.0 failed 1 times; aborting job
2015-07-20 09:58:24,731 INFO  [task-result-getter-0]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
14.0, whose tasks have all completed, from pool
Traceback (most recent call last):
2015-07-20 09:58:24,732 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage
14
  File "<stdin>", line 1, in <module>
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1288, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol2015-07-20 09:58:24,732 WARN  [task-result-getter-1]
scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 0.0 in
stage 14.0 (TID 24, localhost):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
.Py4JJavaError  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
    process()
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
    return f(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
    merger.mergeValues(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
    for k, v in iterator:
  File "<stdin>", line 2, in func
  File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
    obj, end = self._scanner.iterscan(s, **kw).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
    return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 734 (char 734)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

2015-07-20 09:58:24,732 INFO  [Thread-2] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 8 failed: saveAsTextFile at
NativeMethodAccessorImpl.java:-2, took 0.090286 s
2015-07-20 09:58:24,733 INFO  [task-result-getter-1]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
14.0, whose tasks have all completed, from pool
: An error occurred while calling o276.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage
14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
Traceback (most recent call last):
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 107, in main
    process()
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
line 98, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 2073, in pipeline_func
    return func(split, prev_func(split, iterator))
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 247, in func
    return f(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
line 1561, in combineLocally
    merger.mergeValues(iterator)
  File
"/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
line 252, in mergeValues
    for k, v in iterator:
  File "<stdin>", line 2, in func
  File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
    obj, end = self._scanner.iterscan(s, **kw).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
    value, end = iterscan(s, idx=end, context=context).next()
  File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
    rval, next_pos = action(m, context)
  File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
    return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 855 (char 855)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

What I am doing wrong. Please guide.

*Ajay Dubey*

Re: PySpark Nested Json Parsing

Posted by Naveen Madhire <vm...@umail.iu.edu>.
I had the similar issue with spark 1.3
After migrating to Spark 1.4 and using sqlcontext.read.json it worked well
I think you can look at dataframe select and explode options to read the
nested json elements, array etc.

Thanks.


On Mon, Jul 20, 2015 at 11:07 AM, Davies Liu <da...@databricks.com> wrote:

> Could you try SQLContext.read.json()?
>
> On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu <da...@databricks.com> wrote:
> > Before using the json file as text file, can you make sure that each
> > json string can fit in one line? Because textFile() will split the
> > file by '\n'
> >
> > On Mon, Jul 20, 2015 at 3:26 AM, Ajay <aj...@gmail.com> wrote:
> >> Hi,
> >>
> >> I am new to Apache Spark. I am trying to parse nested json using
> pyspark.
> >> Here is the code by which I am trying to parse Json.
> >> I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.
> >>
> >> lines = sc.textFile(inputFile)
> >>
> >> import json
> >> def func(x):
> >> json_str = json.loads(x)
> >> if json_str['label']:
> >>                 if json_str['label']['label2']:
> >> return (1,1)
> >> return (0,1)
> >>
> >> lines.map(func).reduceByKey(lambda a,b: a +
> b).saveAsTextFile(outputFile)
> >>
> >> I am getting following error,
> >> ERROR [Executor task launch worker-13] executor.Executor
> >> (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID
> 25)
> >> org.apache.spark.api.python.PythonException: Traceback (most recent call
> >> last):
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >>     process()
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >>     serializer.dump_stream(func(split_index, iterator), outfile)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >>     return f(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >>     merger.mergeValues(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >>     for k, v in iterator:
> >>   File "<stdin>", line 2, in func
> >>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >>     return _default_decoder.decode(s)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >>     obj, end = self._scanner.iterscan(s, **kw).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >>     return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 855 (char 855)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >> 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
> >> executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0
> in
> >> stage 14.0 (TID 24)
> >> org.apache.spark.api.python.PythonException: Traceback (most recent call
> >> last):
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >>     process()
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >>     serializer.dump_stream(func(split_index, iterator), outfile)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >>     return f(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >>     merger.mergeValues(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >>     for k, v in iterator:
> >>   File "<stdin>", line 2, in func
> >>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >>     return _default_decoder.decode(s)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >>     obj, end = self._scanner.iterscan(s, **kw).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >>     return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 734 (char 734)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >> 2015-07-20 09:58:24,730 WARN  [task-result-getter-0]
> >> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 1.0
> in
> >> stage 14.0 (TID 25, localhost):
> org.apache.spark.api.python.PythonException:
> >> Traceback (most recent call last):
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >>     process()
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >>     serializer.dump_stream(func(split_index, iterator), outfile)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >>     return f(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >>     merger.mergeValues(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >>     for k, v in iterator:
> >>   File "<stdin>", line 2, in func
> >>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >>     return _default_decoder.decode(s)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >>     obj, end = self._scanner.iterscan(s, **kw).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >>     return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 855 (char 855)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> 2015-07-20 09:58:24,731 ERROR [task-result-getter-0]
> >> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 1 in stage
> 14.0
> >> failed 1 times; aborting job
> >> 2015-07-20 09:58:24,731 INFO  [task-result-getter-0]
> >> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed
> TaskSet
> >> 14.0, whose tasks have all completed, from pool
> >> Traceback (most recent call last):
> >> 2015-07-20 09:58:24,732 INFO
> [sparkDriver-akka.actor.default-dispatcher-2]
> >> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling
> stage
> >> 14
> >>   File "<stdin>", line 1, in <module>
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1288, in saveAsTextFile
> >>     keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> >> line 538, in __call__
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> >> line 300, in get_return_value
> >> py4j.protocol2015-07-20 09:58:24,732 WARN  [task-result-getter-1]
> >> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 0.0
> in
> >> stage 14.0 (TID 24, localhost):
> org.apache.spark.api.python.PythonException:
> >> Traceback (most recent call last):
> >> .Py4JJavaError  File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >>     process()
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >>     serializer.dump_stream(func(split_index, iterator), outfile)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >>     return f(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >>     merger.mergeValues(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >>     for k, v in iterator:
> >>   File "<stdin>", line 2, in func
> >>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >>     return _default_decoder.decode(s)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >>     obj, end = self._scanner.iterscan(s, **kw).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >>     return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 734 (char 734)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> 2015-07-20 09:58:24,732 INFO  [Thread-2] scheduler.DAGScheduler
> >> (Logging.scala:logInfo(59)) - Job 8 failed: saveAsTextFile at
> >> NativeMethodAccessorImpl.java:-2, took 0.090286 s
> >> 2015-07-20 09:58:24,733 INFO  [task-result-getter-1]
> >> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed
> TaskSet
> >> 14.0, whose tasks have all completed, from pool
> >> : An error occurred while calling o276.saveAsTextFile.
> >> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 1
> >> in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in
> stage
> >> 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
> >> Traceback (most recent call last):
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 107, in main
> >>     process()
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> >> line 98, in process
> >>     serializer.dump_stream(func(split_index, iterator), outfile)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 2073, in pipeline_func
> >>     return func(split, prev_func(split, iterator))
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 247, in func
> >>     return f(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> >> line 1561, in combineLocally
> >>     merger.mergeValues(iterator)
> >>   File
> >>
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> >> line 252, in mergeValues
> >>     for k, v in iterator:
> >>   File "<stdin>", line 2, in func
> >>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
> >>     return _default_decoder.decode(s)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
> >>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
> >>     obj, end = self._scanner.iterscan(s, **kw).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
> >>     value, end = iterscan(s, idx=end, context=context).next()
> >>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
> >>     rval, next_pos = action(m, context)
> >>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
> >>     return scanstring(match.string, match.end(), encoding, strict)
> >> ValueError: Invalid \escape: line 1 column 855 (char 855)
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> >> at
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Driver stacktrace:
> >> at
> >> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> >> at
> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> >> at scala.Option.foreach(Option.scala:236)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> >> at
> >>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >> at
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>
> >> What I am doing wrong. Please guide.
> >>
> >> Ajay Dubey
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: PySpark Nested Json Parsing

Posted by Davies Liu <da...@databricks.com>.
Could you try SQLContext.read.json()?

On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu <da...@databricks.com> wrote:
> Before using the json file as text file, can you make sure that each
> json string can fit in one line? Because textFile() will split the
> file by '\n'
>
> On Mon, Jul 20, 2015 at 3:26 AM, Ajay <aj...@gmail.com> wrote:
>> Hi,
>>
>> I am new to Apache Spark. I am trying to parse nested json using pyspark.
>> Here is the code by which I am trying to parse Json.
>> I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.
>>
>> lines = sc.textFile(inputFile)
>>
>> import json
>> def func(x):
>> json_str = json.loads(x)
>> if json_str['label']:
>>                 if json_str['label']['label2']:
>> return (1,1)
>> return (0,1)
>>
>> lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)
>>
>> I am getting following error,
>> ERROR [Executor task launch worker-13] executor.Executor
>> (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>>     process()
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>>     serializer.dump_stream(func(split_index, iterator), outfile)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>>     return f(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>>     merger.mergeValues(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>>     for k, v in iterator:
>>   File "<stdin>", line 2, in func
>>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>>     return _default_decoder.decode(s)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>>     obj, end = self._scanner.iterscan(s, **kw).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>>     return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 855 (char 855)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
>> executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
>> stage 14.0 (TID 24)
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>>     process()
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>>     serializer.dump_stream(func(split_index, iterator), outfile)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>>     return f(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>>     merger.mergeValues(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>>     for k, v in iterator:
>>   File "<stdin>", line 2, in func
>>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>>     return _default_decoder.decode(s)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>>     obj, end = self._scanner.iterscan(s, **kw).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>>     return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 734 (char 734)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 2015-07-20 09:58:24,730 WARN  [task-result-getter-0]
>> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 1.0 in
>> stage 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
>> Traceback (most recent call last):
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>>     process()
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>>     serializer.dump_stream(func(split_index, iterator), outfile)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>>     return f(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>>     merger.mergeValues(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>>     for k, v in iterator:
>>   File "<stdin>", line 2, in func
>>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>>     return _default_decoder.decode(s)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>>     obj, end = self._scanner.iterscan(s, **kw).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>>     return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 855 (char 855)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2015-07-20 09:58:24,731 ERROR [task-result-getter-0]
>> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 1 in stage 14.0
>> failed 1 times; aborting job
>> 2015-07-20 09:58:24,731 INFO  [task-result-getter-0]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
>> 14.0, whose tasks have all completed, from pool
>> Traceback (most recent call last):
>> 2015-07-20 09:58:24,732 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage
>> 14
>>   File "<stdin>", line 1, in <module>
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1288, in saveAsTextFile
>>     keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> py4j.protocol2015-07-20 09:58:24,732 WARN  [task-result-getter-1]
>> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 0.0 in
>> stage 14.0 (TID 24, localhost): org.apache.spark.api.python.PythonException:
>> Traceback (most recent call last):
>> .Py4JJavaError  File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>>     process()
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>>     serializer.dump_stream(func(split_index, iterator), outfile)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>>     return f(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>>     merger.mergeValues(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>>     for k, v in iterator:
>>   File "<stdin>", line 2, in func
>>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>>     return _default_decoder.decode(s)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>>     obj, end = self._scanner.iterscan(s, **kw).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>>     return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 734 (char 734)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2015-07-20 09:58:24,732 INFO  [Thread-2] scheduler.DAGScheduler
>> (Logging.scala:logInfo(59)) - Job 8 failed: saveAsTextFile at
>> NativeMethodAccessorImpl.java:-2, took 0.090286 s
>> 2015-07-20 09:58:24,733 INFO  [task-result-getter-1]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
>> 14.0, whose tasks have all completed, from pool
>> : An error occurred while calling o276.saveAsTextFile.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
>> in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage
>> 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
>> Traceback (most recent call last):
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 107, in main
>>     process()
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
>> line 98, in process
>>     serializer.dump_stream(func(split_index, iterator), outfile)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 2073, in pipeline_func
>>     return func(split, prev_func(split, iterator))
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 247, in func
>>     return f(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
>> line 1561, in combineLocally
>>     merger.mergeValues(iterator)
>>   File
>> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
>> line 252, in mergeValues
>>     for k, v in iterator:
>>   File "<stdin>", line 2, in func
>>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>>     return _default_decoder.decode(s)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>>     obj, end = self._scanner.iterscan(s, **kw).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>>     value, end = iterscan(s, idx=end, context=context).next()
>>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>>     rval, next_pos = action(m, context)
>>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>>     return scanstring(match.string, match.end(), encoding, strict)
>> ValueError: Invalid \escape: line 1 column 855 (char 855)
>>
>> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> What I am doing wrong. Please guide.
>>
>> Ajay Dubey

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: PySpark Nested Json Parsing

Posted by Davies Liu <da...@databricks.com>.
Before using the json file as text file, can you make sure that each
json string can fit in one line? Because textFile() will split the
file by '\n'

On Mon, Jul 20, 2015 at 3:26 AM, Ajay <aj...@gmail.com> wrote:
> Hi,
>
> I am new to Apache Spark. I am trying to parse nested json using pyspark.
> Here is the code by which I am trying to parse Json.
> I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.
>
> lines = sc.textFile(inputFile)
>
> import json
> def func(x):
> json_str = json.loads(x)
> if json_str['label']:
>                 if json_str['label']['label2']:
> return (1,1)
> return (0,1)
>
> lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)
>
> I am getting following error,
> ERROR [Executor task launch worker-13] executor.Executor
> (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
>     process()
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
>     return f(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
>     merger.mergeValues(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
>     for k, v in iterator:
>   File "<stdin>", line 2, in func
>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>     return _default_decoder.decode(s)
>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>     obj, end = self._scanner.iterscan(s, **kw).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>     return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 855 (char 855)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
> executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
> stage 14.0 (TID 24)
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
>     process()
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
>     return f(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
>     merger.mergeValues(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
>     for k, v in iterator:
>   File "<stdin>", line 2, in func
>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>     return _default_decoder.decode(s)
>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>     obj, end = self._scanner.iterscan(s, **kw).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>     return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 734 (char 734)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 2015-07-20 09:58:24,730 WARN  [task-result-getter-0]
> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 1.0 in
> stage 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
>     process()
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
>     return f(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
>     merger.mergeValues(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
>     for k, v in iterator:
>   File "<stdin>", line 2, in func
>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>     return _default_decoder.decode(s)
>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>     obj, end = self._scanner.iterscan(s, **kw).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>     return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 855 (char 855)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 2015-07-20 09:58:24,731 ERROR [task-result-getter-0]
> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 1 in stage 14.0
> failed 1 times; aborting job
> 2015-07-20 09:58:24,731 INFO  [task-result-getter-0]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
> 14.0, whose tasks have all completed, from pool
> Traceback (most recent call last):
> 2015-07-20 09:58:24,732 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage
> 14
>   File "<stdin>", line 1, in <module>
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1288, in saveAsTextFile
>     keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol2015-07-20 09:58:24,732 WARN  [task-result-getter-1]
> scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 0.0 in
> stage 14.0 (TID 24, localhost): org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):
> .Py4JJavaError  File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
>     process()
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
>     return f(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
>     merger.mergeValues(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
>     for k, v in iterator:
>   File "<stdin>", line 2, in func
>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>     return _default_decoder.decode(s)
>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>     obj, end = self._scanner.iterscan(s, **kw).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>     return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 734 (char 734)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 2015-07-20 09:58:24,732 INFO  [Thread-2] scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Job 8 failed: saveAsTextFile at
> NativeMethodAccessorImpl.java:-2, took 0.090286 s
> 2015-07-20 09:58:24,733 INFO  [task-result-getter-1]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
> 14.0, whose tasks have all completed, from pool
> : An error occurred while calling o276.saveAsTextFile.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage
> 14.0 (TID 25, localhost): org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 107, in main
>     process()
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py",
> line 98, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 2073, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 247, in func
>     return f(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py",
> line 1561, in combineLocally
>     merger.mergeValues(iterator)
>   File
> "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py",
> line 252, in mergeValues
>     for k, v in iterator:
>   File "<stdin>", line 2, in func
>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>     return _default_decoder.decode(s)
>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>   File "/usr/lib64/python2.6/json/decoder.py", line 336, in raw_decode
>     obj, end = self._scanner.iterscan(s, **kw).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 183, in JSONObject
>     value, end = iterscan(s, idx=end, context=context).next()
>   File "/usr/lib64/python2.6/json/scanner.py", line 55, in iterscan
>     rval, next_pos = action(m, context)
>   File "/usr/lib64/python2.6/json/decoder.py", line 155, in JSONString
>     return scanstring(match.string, match.end(), encoding, strict)
> ValueError: Invalid \escape: line 1 column 855 (char 855)
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> What I am doing wrong. Please guide.
>
> Ajay Dubey

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org