You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lonely Feb <lo...@gmail.com> on 2015/03/23 08:43:59 UTC

Spark Sql with python udf fail

Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a sql
job with python udf i got a exception:

java.lang.ArrayIndexOutOfBoundsException: 9
        at
org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
        at
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
        at
org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
        at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
        at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156)
        at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
        at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
        at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

I suspected there was an odd line in the input file. But the input file is
so large and i could not found any abnormal lines with several jobs to
check. How can i get the abnormal line here ?

Re: Spark Sql with python udf fail

Posted by lonely Feb <lo...@gmail.com>.
sql("SELECT * FROM <your-table>").foreach(println)

can be executed successfully. So the problem may still be in UDF code. How
can i print the the line with ArrayIndexOutOfBoundsException in catalyst?

2015-03-23 17:04 GMT+08:00 lonely Feb <lo...@gmail.com>:

> ok i'll try asap
>
> 2015-03-23 17:00 GMT+08:00 Cheng Lian <li...@gmail.com>:
>
>>  I suspect there is a malformed row in your input dataset. Could you try
>> something like this to confirm:
>>
>> sql("SELECT * FROM <your-table>").foreach(println)
>>
>> If there does exist a malformed line, you should see similar exception.
>> And you can catch it with the help of the output. Notice that the messages
>> are printed to stdout on executor side.
>>
>> On 3/23/15 4:36 PM, lonely Feb wrote:
>>
>>   I caught exceptions in the python UDF code, flush exceptions into a
>> single file, and made sure the the column number of the output lines as
>> same as sql schema.
>>
>>  Sth. interesting is that my output line of the UDF code is just 10
>> columns, and the exception above is java.lang.
>> ArrayIndexOutOfBoundsException: 9, is there any inspirations?
>>
>> 2015-03-23 16:24 GMT+08:00 Cheng Lian <li...@gmail.com>:
>>
>>> Could you elaborate on the UDF code?
>>>
>>>
>>> On 3/23/15 3:43 PM, lonely Feb wrote:
>>>
>>>> Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a
>>>> sql job with python udf i got a exception:
>>>>
>>>> java.lang.ArrayIndexOutOfBoundsException: 9
>>>>         at
>>>> org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
>>>>         at
>>>> org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
>>>>         at
>>>> org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
>>>>         at
>>>> org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
>>>>         at
>>>> org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
>>>>         at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390)
>>>>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>         at
>>>> org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156)
>>>>         at
>>>> org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151)
>>>>         at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
>>>>         at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>
>>>> I suspected there was an odd line in the input file. But the input file
>>>> is so large and i could not found any abnormal lines with several jobs to
>>>> check. How can i get the abnormal line here ?
>>>>
>>>
>>>
>>    ​
>>
>
>

Re: Spark Sql with python udf fail

Posted by lonely Feb <lo...@gmail.com>.
ok i'll try asap

2015-03-23 17:00 GMT+08:00 Cheng Lian <li...@gmail.com>:

>  I suspect there is a malformed row in your input dataset. Could you try
> something like this to confirm:
>
> sql("SELECT * FROM <your-table>").foreach(println)
>
> If there does exist a malformed line, you should see similar exception.
> And you can catch it with the help of the output. Notice that the messages
> are printed to stdout on executor side.
>
> On 3/23/15 4:36 PM, lonely Feb wrote:
>
>   I caught exceptions in the python UDF code, flush exceptions into a
> single file, and made sure the the column number of the output lines as
> same as sql schema.
>
>  Sth. interesting is that my output line of the UDF code is just 10
> columns, and the exception above is java.lang.
> ArrayIndexOutOfBoundsException: 9, is there any inspirations?
>
> 2015-03-23 16:24 GMT+08:00 Cheng Lian <li...@gmail.com>:
>
>> Could you elaborate on the UDF code?
>>
>>
>> On 3/23/15 3:43 PM, lonely Feb wrote:
>>
>>> Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a
>>> sql job with python udf i got a exception:
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: 9
>>>         at
>>> org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
>>>         at
>>> org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
>>>         at
>>> org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
>>>         at
>>> org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
>>>         at
>>> org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
>>>         at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390)
>>>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>         at
>>> org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156)
>>>         at
>>> org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151)
>>>         at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
>>>         at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:744)
>>>
>>> I suspected there was an odd line in the input file. But the input file
>>> is so large and i could not found any abnormal lines with several jobs to
>>> check. How can i get the abnormal line here ?
>>>
>>
>>
>    ​
>

Re: Spark Sql with python udf fail

Posted by Cheng Lian <li...@gmail.com>.
I suspect there is a malformed row in your input dataset. Could you try 
something like this to confirm:

|sql("SELECT * FROM <your-table>").foreach(println)
|

If there does exist a malformed line, you should see similar exception. 
And you can catch it with the help of the output. Notice that the 
messages are printed to stdout on executor side.

On 3/23/15 4:36 PM, lonely Feb wrote:

> I caught exceptions in the python UDF code, flush exceptions into a 
> single file, and made sure the the column number of the output lines 
> as same as sql schema.
>
> Sth. interesting is that my output line of the UDF code is just 10 
> columns, and the exception above is 
> java.lang.ArrayIndexOutOfBoundsException: 9, is there any inspirations?
>
> 2015-03-23 16:24 GMT+08:00 Cheng Lian <lian.cs.zju@gmail.com 
> <ma...@gmail.com>>:
>
>     Could you elaborate on the UDF code?
>
>
>     On 3/23/15 3:43 PM, lonely Feb wrote:
>
>         Hi all, I tried to transfer some hive jobs into spark-sql.
>         When i ran a sql job with python udf i got a exception:
>
>         java.lang.ArrayIndexOutOfBoundsException: 9
>                 at
>         org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
>                 at
>         org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
>                 at
>         org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
>                 at
>         org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
>                 at
>         org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
>                 at
>         scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390)
>                 at
>         scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>                 at
>         org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156)
>                 at
>         org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151)
>                 at
>         org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
>                 at
>         org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
>                 at
>         org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>                 at
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>                 at
>         org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>                 at
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>                 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>                 at
>         org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>                 at
>         org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>                 at org.apache.spark.scheduler.Task.run(Task.scala:56)
>                 at
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
>                 at
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>                 at
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>                 at java.lang.Thread.run(Thread.java:744)
>
>         I suspected there was an odd line in the input file. But the
>         input file is so large and i could not found any abnormal
>         lines with several jobs to check. How can i get the abnormal
>         line here ?
>
>
>
​

Re: Spark Sql with python udf fail

Posted by lonely Feb <lo...@gmail.com>.
I caught exceptions in the python UDF code, flush exceptions into a single
file, and made sure the the column number of the output lines as same as
sql schema.

Sth. interesting is that my output line of the UDF code is just 10 columns,
and the exception above is java.lang.ArrayIndexOutOfBoundsException: 9, is
there any inspirations?

2015-03-23 16:24 GMT+08:00 Cheng Lian <li...@gmail.com>:

> Could you elaborate on the UDF code?
>
>
> On 3/23/15 3:43 PM, lonely Feb wrote:
>
>> Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a
>> sql job with python udf i got a exception:
>>
>> java.lang.ArrayIndexOutOfBoundsException: 9
>>         at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(
>> Row.scala:142)
>>         at org.apache.spark.sql.catalyst.expressions.BoundReference.
>> eval(BoundAttribute.scala:37)
>>         at org.apache.spark.sql.catalyst.expressions.EqualTo.eval(
>> predicates.scala:166)
>>         at org.apache.spark.sql.catalyst.expressions.
>> InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
>>         at org.apache.spark.sql.catalyst.expressions.
>> InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
>>         at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at org.apache.spark.sql.execution.Aggregate$$anonfun$
>> execute$1$$anonfun$7.apply(Aggregate.scala:156)
>>         at org.apache.spark.sql.execution.Aggregate$$anonfun$
>> execute$1$$anonfun$7.apply(Aggregate.scala:151)
>>         at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
>>         at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
>>         at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:35)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.
>> scala:263)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>         at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:35)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.
>> scala:263)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:68)
>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>         at org.apache.spark.executor.Executor$TaskRunner.run(
>> Executor.scala:197)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> I suspected there was an odd line in the input file. But the input file
>> is so large and i could not found any abnormal lines with several jobs to
>> check. How can i get the abnormal line here ?
>>
>
>

Re: Spark Sql with python udf fail

Posted by Cheng Lian <li...@gmail.com>.
Could you elaborate on the UDF code?

On 3/23/15 3:43 PM, lonely Feb wrote:
> Hi all, I tried to transfer some hive jobs into spark-sql. When i ran 
> a sql job with python udf i got a exception:
>
> java.lang.ArrayIndexOutOfBoundsException: 9
>         at 
> org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
>         at 
> org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
>         at 
> org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
>         at 
> org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
>         at 
> org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
>         at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at 
> org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156)
>         at 
> org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
>         at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
>         at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:744)
>
> I suspected there was an odd line in the input file. But the input 
> file is so large and i could not found any abnormal lines with several 
> jobs to check. How can i get the abnormal line here ?


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org