You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mohamed Lrhazi <Mo...@georgetown.edu> on 2014/12/09 20:32:22 UTC

PySprak and UnsupportedOperationException

While trying simple examples of PySpark code, I systematically get these
failures when I try this.. I dont see any prior exceptions in the output...
How can I debug further to find root cause?


es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={
        "es.resource" : "en_2014/doc",
        "es.nodes":"rap-es2",
        "es.query" :  """{"query":{"match_all":{}},"fields":["title"],
"size": 100}"""
        }
    )


titles=es_rdd.map(lambda d: d[1]['title'][0])
counts = titles.flatMap(lambda x: x.split(' ')).map(lambda x: (x,
1)).reduceByKey(add)


output = counts.collect()



...
14/12/09 19:27:20 INFO BlockManager: Removing broadcast 93
14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_93
14/12/09 19:27:20 INFO MemoryStore: Block broadcast_93 of size 2448 dropped
from memory (free 274984768)
14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 93
14/12/09 19:27:20 INFO BlockManager: Removing broadcast 92
14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_92
14/12/09 19:27:20 INFO MemoryStore: Block broadcast_92 of size 163391
dropped from memory (free 275148159)
14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 92
14/12/09 19:27:20 INFO BlockManager: Removing broadcast 91
14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_91
14/12/09 19:27:20 INFO MemoryStore: Block broadcast_91 of size 163391
dropped from memory (free 275311550)
14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 91
14/12/09 19:27:30 ERROR Executor: Exception in task 0.0 in stage 67.0 (TID
72)
java.lang.UnsupportedOperationException
        at java.util.AbstractMap.put(AbstractMap.java:203)
        at java.util.AbstractMap.putAll(AbstractMap.java:273)
        at
org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:373)
        at
org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:322)
        at
org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.next(EsInputFormat.java:299)
        at
org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.nextKeyValue(EsInputFormat.java:227)
        at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:138)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
        at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
        at
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:969)
        at
scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:339)
        at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
        at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364)
        at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
14/12/09 19:27:30 INFO TaskSetManager: Starting task 2.0 in stage 67.0 (TID
74, localhost, ANY, 26266 bytes)
14/12/09 19:27:30 INFO Executor: Running task 2.0 in stage 67.0 (TID 74)
14/12/09 19:27:30 WARN TaskSetManager: Lost task 0.0 in stage 67.0 (TID 72,
localhost): java.lang.UnsupportedOperationException:

Re: PySprak and UnsupportedOperationException

Posted by Mohamed Lrhazi <Mo...@georgetown.edu>.
Thanks Davies. it turns out it was indeed and they fixed it in last night's
nightly build!

https://github.com/elasticsearch/elasticsearch-hadoop/issues/338

On Wed, Dec 10, 2014 at 2:52 AM, Davies Liu <da...@databricks.com> wrote:

> On Tue, Dec 9, 2014 at 11:32 AM, Mohamed Lrhazi
> <Mo...@georgetown.edu> wrote:
> > While trying simple examples of PySpark code, I systematically get these
> > failures when I try this.. I dont see any prior exceptions in the
> output...
> > How can I debug further to find root cause?
> >
> >
> > es_rdd = sc.newAPIHadoopRDD(
> >     inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
> >     keyClass="org.apache.hadoop.io.NullWritable",
> >     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
> >     conf={
> >         "es.resource" : "en_2014/doc",
> >         "es.nodes":"rap-es2",
> >         "es.query" :  """{"query":{"match_all":{}},"fields":["title"],
> > "size": 100}"""
> >         }
> >     )
> >
> >
> > titles=es_rdd.map(lambda d: d[1]['title'][0])
> > counts = titles.flatMap(lambda x: x.split(' ')).map(lambda x: (x,
> > 1)).reduceByKey(add)
> > output = counts.collect()
> >
> >
> >
> > ...
> > 14/12/09 19:27:20 INFO BlockManager: Removing broadcast 93
> > 14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_93
> > 14/12/09 19:27:20 INFO MemoryStore: Block broadcast_93 of size 2448
> dropped
> > from memory (free 274984768)
> > 14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 93
> > 14/12/09 19:27:20 INFO BlockManager: Removing broadcast 92
> > 14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_92
> > 14/12/09 19:27:20 INFO MemoryStore: Block broadcast_92 of size 163391
> > dropped from memory (free 275148159)
> > 14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 92
> > 14/12/09 19:27:20 INFO BlockManager: Removing broadcast 91
> > 14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_91
> > 14/12/09 19:27:20 INFO MemoryStore: Block broadcast_91 of size 163391
> > dropped from memory (free 275311550)
> > 14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 91
> > 14/12/09 19:27:30 ERROR Executor: Exception in task 0.0 in stage 67.0
> (TID
> > 72)
> > java.lang.UnsupportedOperationException
> >         at java.util.AbstractMap.put(AbstractMap.java:203)
> >         at java.util.AbstractMap.putAll(AbstractMap.java:273)
> >         at
> >
> org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:373)
> >         at
>
> It looks like it's a bug in ElasticSearch (EsInputFormat).
>
> >
> org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:322)
> >         at
> >
> org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.next(EsInputFormat.java:299)
> >         at
> >
> org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.nextKeyValue(EsInputFormat.java:227)
> >         at
> > org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:138)
> >         at
> >
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >         at
> >
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
> >         at
> scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
> >         at
> > scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:969)
> >         at
> > scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
> >         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >         at
> >
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:339)
> >         at
>
> This means that the task failed when it read the data in EsInputFormat
> to feed Python mapper.
>
> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
> >         at
> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
> >         at
> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
> >         at
> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364)
> >         at
> >
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
> > 14/12/09 19:27:30 INFO TaskSetManager: Starting task 2.0 in stage 67.0
> (TID
> > 74, localhost, ANY, 26266 bytes)
> > 14/12/09 19:27:30 INFO Executor: Running task 2.0 in stage 67.0 (TID 74)
> > 14/12/09 19:27:30 WARN TaskSetManager: Lost task 0.0 in stage 67.0 (TID
> 72,
> > localhost): java.lang.UnsupportedOperationException:
>

Re: PySprak and UnsupportedOperationException

Posted by Davies Liu <da...@databricks.com>.
On Tue, Dec 9, 2014 at 11:32 AM, Mohamed Lrhazi
<Mo...@georgetown.edu> wrote:
> While trying simple examples of PySpark code, I systematically get these
> failures when I try this.. I dont see any prior exceptions in the output...
> How can I debug further to find root cause?
>
>
> es_rdd = sc.newAPIHadoopRDD(
>     inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
>     keyClass="org.apache.hadoop.io.NullWritable",
>     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
>     conf={
>         "es.resource" : "en_2014/doc",
>         "es.nodes":"rap-es2",
>         "es.query" :  """{"query":{"match_all":{}},"fields":["title"],
> "size": 100}"""
>         }
>     )
>
>
> titles=es_rdd.map(lambda d: d[1]['title'][0])
> counts = titles.flatMap(lambda x: x.split(' ')).map(lambda x: (x,
> 1)).reduceByKey(add)
> output = counts.collect()
>
>
>
> ...
> 14/12/09 19:27:20 INFO BlockManager: Removing broadcast 93
> 14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_93
> 14/12/09 19:27:20 INFO MemoryStore: Block broadcast_93 of size 2448 dropped
> from memory (free 274984768)
> 14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 93
> 14/12/09 19:27:20 INFO BlockManager: Removing broadcast 92
> 14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_92
> 14/12/09 19:27:20 INFO MemoryStore: Block broadcast_92 of size 163391
> dropped from memory (free 275148159)
> 14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 92
> 14/12/09 19:27:20 INFO BlockManager: Removing broadcast 91
> 14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_91
> 14/12/09 19:27:20 INFO MemoryStore: Block broadcast_91 of size 163391
> dropped from memory (free 275311550)
> 14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 91
> 14/12/09 19:27:30 ERROR Executor: Exception in task 0.0 in stage 67.0 (TID
> 72)
> java.lang.UnsupportedOperationException
>         at java.util.AbstractMap.put(AbstractMap.java:203)
>         at java.util.AbstractMap.putAll(AbstractMap.java:273)
>         at
> org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:373)
>         at

It looks like it's a bug in ElasticSearch (EsInputFormat).

> org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:322)
>         at
> org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.next(EsInputFormat.java:299)
>         at
> org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.nextKeyValue(EsInputFormat.java:227)
>         at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:138)
>         at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
>         at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
>         at
> scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:969)
>         at
> scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:339)
>         at

This means that the task failed when it read the data in EsInputFormat
to feed Python mapper.

> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
>         at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
>         at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
>         at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364)
>         at
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
> 14/12/09 19:27:30 INFO TaskSetManager: Starting task 2.0 in stage 67.0 (TID
> 74, localhost, ANY, 26266 bytes)
> 14/12/09 19:27:30 INFO Executor: Running task 2.0 in stage 67.0 (TID 74)
> 14/12/09 19:27:30 WARN TaskSetManager: Lost task 0.0 in stage 67.0 (TID 72,
> localhost): java.lang.UnsupportedOperationException:

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: PySprak and UnsupportedOperationException

Posted by Mohamed Lrhazi <Mo...@georgetown.edu>.
somehow I now get a better error trace, I believe for the same root
issue... any advice of how to narrow this down further highly appreciated:


...
14/12/10 07:15:03 ERROR PythonRDD: Python worker exited unexpectedly
(crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/spark/python/pyspark/worker.py", line 75, in main
    command = pickleSer._read_with_length(infile)
  File "/spark/python/pyspark/serializers.py", line 146, in
_read_with_length
    length = read_int(stream)
  File "/spark/python/pyspark/serializers.py", line 464, in read_int
    raise EOFError
EOFError

        at
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
        at
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)




On Tue, Dec 9, 2014 at 2:32 PM, Mohamed Lrhazi <
Mohamed.Lrhazi@georgetown.edu> wrote:

> While trying simple examples of PySpark code, I systematically get these
> failures when I try this.. I dont see any prior exceptions in the output...
> How can I debug further to find root cause?
>
>
> es_rdd = sc.newAPIHadoopRDD(
>     inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
>     keyClass="org.apache.hadoop.io.NullWritable",
>     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
>     conf={
>         "es.resource" : "en_2014/doc",
>         "es.nodes":"rap-es2",
>         "es.query" :  """{"query":{"match_all":{}},"fields":["title"],
> "size": 100}"""
>         }
>     )
>
>
> titles=es_rdd.map(lambda d: d[1]['title'][0])
> counts = titles.flatMap(lambda x: x.split(' ')).map(lambda x: (x,
> 1)).reduceByKey(add)
>
>
> output = counts.collect()
>
>
>
> ...
> 14/12/09 19:27:20 INFO BlockManager: Removing broadcast 93
> 14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_93
> 14/12/09 19:27:20 INFO MemoryStore: Block broadcast_93 of size 2448
> dropped from memory (free 274984768)
> 14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 93
> 14/12/09 19:27:20 INFO BlockManager: Removing broadcast 92
> 14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_92
> 14/12/09 19:27:20 INFO MemoryStore: Block broadcast_92 of size 163391
> dropped from memory (free 275148159)
> 14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 92
> 14/12/09 19:27:20 INFO BlockManager: Removing broadcast 91
> 14/12/09 19:27:20 INFO BlockManager: Removing block broadcast_91
> 14/12/09 19:27:20 INFO MemoryStore: Block broadcast_91 of size 163391
> dropped from memory (free 275311550)
> 14/12/09 19:27:20 INFO ContextCleaner: Cleaned broadcast 91
> 14/12/09 19:27:30 ERROR Executor: Exception in task 0.0 in stage 67.0 (TID
> 72)
> java.lang.UnsupportedOperationException
>         at java.util.AbstractMap.put(AbstractMap.java:203)
>         at java.util.AbstractMap.putAll(AbstractMap.java:273)
>         at
> org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:373)
>         at
> org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:322)
>         at
> org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.next(EsInputFormat.java:299)
>         at
> org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.nextKeyValue(EsInputFormat.java:227)
>         at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:138)
>         at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
>         at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
>         at
> scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:969)
>         at
> scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:339)
>         at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
>         at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
>         at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
>         at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364)
>         at
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
> 14/12/09 19:27:30 INFO TaskSetManager: Starting task 2.0 in stage 67.0
> (TID 74, localhost, ANY, 26266 bytes)
> 14/12/09 19:27:30 INFO Executor: Running task 2.0 in stage 67.0 (TID 74)
> 14/12/09 19:27:30 WARN TaskSetManager: Lost task 0.0 in stage 67.0 (TID
> 72, localhost): java.lang.UnsupportedOperationException:
>