You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jon Chase <jo...@gmail.com> on 2015/03/27 12:00:24 UTC

Spark SQL "lateral view explode" doesn't work, and unable to save array types to Parquet

Spark 1.3.0

Two issues:

a) I'm unable to get a "lateral view explode" query to work on an array type
b) I'm unable to save an array type to a Parquet file

I keep running into this:

      java.lang.ClassCastException: [I cannot be cast to
scala.collection.Seq

Here's a stack trace from the explode issue:

root
 |-- col1: string (nullable = false)
 |-- col2s: array (nullable = true)
 |    |-- element: integer (containsNull = true)

ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0
(TID 15)
java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
at
org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
~[spark-sql_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
~[spark-sql_2.10-1.3.0.jar:1.3.0]
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
~[scala-library-2.10.4.jar:na]
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.scheduler.Task.run(Task.scala:64)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_31]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_31]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
WARN  o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID
15, localhost): java.lang.ClassCastException: [I cannot be cast to
scala.collection.Seq
at
org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



Maybe I'm defining the schema incorrectly?



This test demonstrates both issues:

    @Rule
    public TemporaryFolder tmp = new TemporaryFolder();

    @Test
    public void testPercentileWithExplode() throws Exception {
        StructType schema = DataTypes.createStructType(Lists.newArrayList(
                DataTypes.createStructField("col1", DataTypes.StringType,
false),
                DataTypes.createStructField("col2s",
DataTypes.createArrayType(DataTypes.IntegerType, true), true)
        ));

        JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
                RowFactory.create("test", new int[]{1, 2, 3})
        ));

        DataFrame df = sql.createDataFrame(rowRDD, schema);
        df.registerTempTable("df");
        df.printSchema();

        List<int[]> ints = sql.sql("select col2s from df").javaRDD()
                              .map(row -> (int[]) row.get(0)).collect();
        assertEquals(1, ints.size());
        assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));


        // fails: lateral view explode does not work:
java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
        List<Integer> explodedInts = sql.sql("select col2 from df lateral
view explode(col2s) splode as col2").javaRDD()
                                        .map(row ->
row.getInt(0)).collect();
        assertEquals(3, explodedInts.size());
        assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);


        // fails: java.lang.ClassCastException: [I cannot be cast to
scala.collection.Seq
        df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() + "/parquet");


        DataFrame loadedDf = sql.load(tmp.getRoot().getAbsolutePath() +
"/parquet");
        loadedDf.registerTempTable("loadedDf");
        List<int[]> moreInts = sql.sql("select col2s from
loadedDf").javaRDD()
                                  .map(row -> (int[]) row.get(0)).collect();
        assertEquals(1, moreInts.size());
        assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
    }

Re: Spark SQL "lateral view explode" doesn't work, and unable to save array types to Parquet

Posted by Cheng Lian <li...@gmail.com>.
Thanks for the detailed information!

On 3/27/15 9:16 PM, Jon Chase wrote:
> Done.  I also updated the name on the ticket to include both issues. 
>  "Spark SQL arrays: "explode()" fails and cannot save array type to 
> Parquet"
>
> https://issues.apache.org/jira/browse/SPARK-6570
>
> On Fri, Mar 27, 2015 at 8:14 AM, Cheng Lian <lian.cs.zju@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Forgot to mention that, would you mind to also provide the full
>     stack trace of the exception thrown in the saveAsParquetFile call?
>     Thanks!
>
>     Cheng
>
>     On 3/27/15 7:35 PM, Jon Chase wrote:
>>     https://issues.apache.org/jira/browse/SPARK-6570
>>
>>     I also left in the call to saveAsParquetFile(), as it produced a
>>     similar exception (though there was no use of explode there).
>>
>>     On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian
>>     <lian.cs.zju@gmail.com <ma...@gmail.com>> wrote:
>>
>>         This should be a bug in the Explode.eval(), which always
>>         assumes the underlying SQL array is represented by a Scala
>>         Seq. Would you mind to open a JIRA ticket for this? Thanks!
>>
>>         Cheng
>>
>>         On 3/27/15 7:00 PM, Jon Chase wrote:
>>>         Spark 1.3.0
>>>
>>>         Two issues:
>>>
>>>         a) I'm unable to get a "lateral view explode" query to work
>>>         on an array type
>>>         b) I'm unable to save an array type to a Parquet file
>>>
>>>         I keep running into this:
>>>
>>>         java.lang.ClassCastException: [I cannot be cast to
>>>         scala.collection.Seq
>>>
>>>         Here's a stack trace from the explode issue:
>>>
>>>         root
>>>          |-- col1: string (nullable = false)
>>>          |-- col2s: array (nullable = true)
>>>          |    |-- element: integer (containsNull = true)
>>>
>>>         ERROR org.apache.spark.executor.Executor Exception in task
>>>         7.0 in stage 1.0 (TID 15)
>>>         java.lang.ClassCastException: [I cannot be cast to
>>>         scala.collection.Seq
>>>         at
>>>         org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>>>         ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
>>>         at
>>>         org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>>>         ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>>>         at
>>>         org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>>>         ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>>>         at
>>>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at scala.collection.TraversableOnce$class.to
>>>         <http://class.to>(TraversableOnce.scala:273)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at scala.collection.AbstractIterator.to
>>>         <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at
>>>         scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>         ~[scala-library-2.10.4.jar:na]
>>>         at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>>         ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>>         at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>>         ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>>         at
>>>         org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>>         ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>>         at
>>>         org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>>         ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>>         at
>>>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>         ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>         ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>>         at
>>>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>         ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>>         at
>>>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         [na:1.8.0_31]
>>>         at
>>>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         [na:1.8.0_31]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
>>>         WARN  o.a.spark.scheduler.TaskSetManager Lost task 7.0 in
>>>         stage 1.0 (TID 15, localhost): java.lang.ClassCastException:
>>>         [I cannot be cast to scala.collection.Seq
>>>         at
>>>         org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>>>         at
>>>         org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>>>         at
>>>         org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>>>         at
>>>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>         at
>>>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         at
>>>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         at
>>>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>         at
>>>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>         at
>>>         scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>         at
>>>         scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>         at
>>>         scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>         at scala.collection.TraversableOnce$class.to
>>>         <http://class.to>(TraversableOnce.scala:273)
>>>         at scala.collection.AbstractIterator.to
>>>         <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)
>>>         at
>>>         scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>         at
>>>         scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>         at
>>>         scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>         at
>>>         scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>         at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>>         at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>>         at
>>>         org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>>         at
>>>         org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>>         at
>>>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>         at
>>>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>         at
>>>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>>         Maybe I'm defining the schema incorrectly?
>>>
>>>
>>>
>>>         This test demonstrates both issues:
>>>
>>>             @Rule
>>>             public TemporaryFolder tmp = new TemporaryFolder();
>>>
>>>             @Test
>>>             public void testPercentileWithExplode() throws Exception {
>>>                 StructType schema =
>>>         DataTypes.createStructType(Lists.newArrayList(
>>>         DataTypes.createStructField("col1", DataTypes.StringType,
>>>         false),
>>>         DataTypes.createStructField("col2s",
>>>         DataTypes.createArrayType(DataTypes.IntegerType, true), true)
>>>                 ));
>>>
>>>                 JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
>>>         RowFactory.create("test", new int[]{1, 2, 3})
>>>                 ));
>>>
>>>                 DataFrame df = sql.createDataFrame(rowRDD, schema);
>>>         df.registerTempTable("df");
>>>                 df.printSchema();
>>>
>>>                 List<int[]> ints = sql.sql("select col2s from
>>>         df").javaRDD()
>>>         .map(row -> (int[]) row.get(0)).collect();
>>>                 assertEquals(1, ints.size());
>>>                 assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>>>
>>>
>>>                 // fails: lateral view explode does not work:
>>>         java.lang.ClassCastException: [I cannot be cast to
>>>         scala.collection.Seq
>>>                 List<Integer> explodedInts = sql.sql("select col2
>>>         from df lateral view explode(col2s) splode as col2").javaRDD()
>>>                   .map(row -> row.getInt(0)).collect();
>>>                 assertEquals(3, explodedInts.size());
>>>         assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>>>
>>>
>>>                 // fails: java.lang.ClassCastException: [I cannot be
>>>         cast to scala.collection.Seq
>>>         df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() +
>>>         "/parquet");
>>>
>>>
>>>                 DataFrame loadedDf =
>>>         sql.load(tmp.getRoot().getAbsolutePath() + "/parquet");
>>>         loadedDf.registerTempTable("loadedDf");
>>>                 List<int[]> moreInts = sql.sql("select col2s from
>>>         loadedDf").javaRDD()
>>>             .map(row -> (int[]) row.get(0)).collect();
>>>                 assertEquals(1, moreInts.size());
>>>                 assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
>>>             }
>>>
>>
>>
>
>


Re: Spark SQL "lateral view explode" doesn't work, and unable to save array types to Parquet

Posted by Jon Chase <jo...@gmail.com>.
Done.  I also updated the name on the ticket to include both issues.
 "Spark SQL arrays: "explode()" fails and cannot save array type to Parquet"

https://issues.apache.org/jira/browse/SPARK-6570

On Fri, Mar 27, 2015 at 8:14 AM, Cheng Lian <li...@gmail.com> wrote:

>  Forgot to mention that, would you mind to also provide the full stack
> trace of the exception thrown in the saveAsParquetFile call? Thanks!
>
> Cheng
>
> On 3/27/15 7:35 PM, Jon Chase wrote:
>
> https://issues.apache.org/jira/browse/SPARK-6570
>
>  I also left in the call to saveAsParquetFile(), as it produced a similar
> exception (though there was no use of explode there).
>
> On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian <li...@gmail.com> wrote:
>
>>  This should be a bug in the Explode.eval(), which always assumes the
>> underlying SQL array is represented by a Scala Seq. Would you mind to open
>> a JIRA ticket for this? Thanks!
>>
>> Cheng
>>
>> On 3/27/15 7:00 PM, Jon Chase wrote:
>>
>> Spark 1.3.0
>>
>>  Two issues:
>>
>>  a) I'm unable to get a "lateral view explode" query to work on an array
>> type
>> b) I'm unable to save an array type to a Parquet file
>>
>>  I keep running into this:
>>
>>        java.lang.ClassCastException: [I cannot be cast to
>> scala.collection.Seq
>>
>>  Here's a stack trace from the explode issue:
>>
>>  root
>>  |-- col1: string (nullable = false)
>>  |-- col2s: array (nullable = true)
>>  |    |-- element: integer (containsNull = true)
>>
>>  ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage
>> 1.0 (TID 15)
>> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
>>  at
>> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>> ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
>>  at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>>  at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> ~[scala-library-2.10.4.jar:na]
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> ~[scala-library-2.10.4.jar:na]
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> ~[scala-library-2.10.4.jar:na]
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> ~[scala-library-2.10.4.jar:na]
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> ~[scala-library-2.10.4.jar:na]
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>>  at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> ~[scala-library-2.10.4.jar:na]
>>  at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> ~[scala-library-2.10.4.jar:na]
>>  at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> ~[scala-library-2.10.4.jar:na]
>>  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>> ~[scala-library-2.10.4.jar:na]
>>  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>>  at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> ~[scala-library-2.10.4.jar:na]
>>  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>>  at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> ~[scala-library-2.10.4.jar:na]
>>  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>>  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>  at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>  at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>  at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>  at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>  at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> [na:1.8.0_31]
>>  at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> [na:1.8.0_31]
>>  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
>> WARN  o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID
>> 15, localhost): java.lang.ClassCastException: [I cannot be cast to
>> scala.collection.Seq
>>  at
>> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>>  at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>>  at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>  at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>  at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>  at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>  at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>  at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>  at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>  at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>  at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>  Maybe I'm defining the schema incorrectly?
>>
>>
>>
>>  This test demonstrates both issues:
>>
>>      @Rule
>>     public TemporaryFolder tmp = new TemporaryFolder();
>>
>>      @Test
>>     public void testPercentileWithExplode() throws Exception {
>>         StructType schema = DataTypes.createStructType(Lists.newArrayList(
>>                 DataTypes.createStructField("col1", DataTypes.StringType,
>> false),
>>                 DataTypes.createStructField("col2s",
>> DataTypes.createArrayType(DataTypes.IntegerType, true), true)
>>         ));
>>
>>          JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
>>                 RowFactory.create("test", new int[]{1, 2, 3})
>>         ));
>>
>>          DataFrame df = sql.createDataFrame(rowRDD, schema);
>>         df.registerTempTable("df");
>>         df.printSchema();
>>
>>          List<int[]> ints = sql.sql("select col2s from df").javaRDD()
>>                               .map(row -> (int[]) row.get(0)).collect();
>>         assertEquals(1, ints.size());
>>         assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>>
>>
>>          // fails: lateral view explode does not work:
>> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
>>         List<Integer> explodedInts = sql.sql("select col2 from df lateral
>> view explode(col2s) splode as col2").javaRDD()
>>                                         .map(row ->
>> row.getInt(0)).collect();
>>         assertEquals(3, explodedInts.size());
>>         assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>>
>>
>>          // fails: java.lang.ClassCastException: [I cannot be cast to
>> scala.collection.Seq
>>         df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() +
>> "/parquet");
>>
>>
>>          DataFrame loadedDf = sql.load(tmp.getRoot().getAbsolutePath() +
>> "/parquet");
>>         loadedDf.registerTempTable("loadedDf");
>>         List<int[]> moreInts = sql.sql("select col2s from
>> loadedDf").javaRDD()
>>                                   .map(row -> (int[])
>> row.get(0)).collect();
>>         assertEquals(1, moreInts.size());
>>         assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
>>     }
>>
>>
>>
>
>

Re: Spark SQL "lateral view explode" doesn't work, and unable to save array types to Parquet

Posted by Cheng Lian <li...@gmail.com>.
Forgot to mention that, would you mind to also provide the full stack 
trace of the exception thrown in the saveAsParquetFile call? Thanks!

Cheng

On 3/27/15 7:35 PM, Jon Chase wrote:
> https://issues.apache.org/jira/browse/SPARK-6570
>
> I also left in the call to saveAsParquetFile(), as it produced a 
> similar exception (though there was no use of explode there).
>
> On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian <lian.cs.zju@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     This should be a bug in the Explode.eval(), which always assumes
>     the underlying SQL array is represented by a Scala Seq. Would you
>     mind to open a JIRA ticket for this? Thanks!
>
>     Cheng
>
>     On 3/27/15 7:00 PM, Jon Chase wrote:
>>     Spark 1.3.0
>>
>>     Two issues:
>>
>>     a) I'm unable to get a "lateral view explode" query to work on an
>>     array type
>>     b) I'm unable to save an array type to a Parquet file
>>
>>     I keep running into this:
>>
>>           java.lang.ClassCastException: [I cannot be cast to
>>     scala.collection.Seq
>>
>>     Here's a stack trace from the explode issue:
>>
>>     root
>>      |-- col1: string (nullable = false)
>>      |-- col2s: array (nullable = true)
>>      |    |-- element: integer (containsNull = true)
>>
>>     ERROR org.apache.spark.executor.Executor Exception in task 7.0 in
>>     stage 1.0 (TID 15)
>>     java.lang.ClassCastException: [I cannot be cast to
>>     scala.collection.Seq
>>     at
>>     org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>>     ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
>>     at
>>     org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>>     ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>>     at
>>     org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>>     ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>>     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>     ~[scala-library-2.10.4.jar:na]
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>     ~[scala-library-2.10.4.jar:na]
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>     ~[scala-library-2.10.4.jar:na]
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>     ~[scala-library-2.10.4.jar:na]
>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>     ~[scala-library-2.10.4.jar:na]
>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>     ~[scala-library-2.10.4.jar:na]
>>     at
>>     scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>     ~[scala-library-2.10.4.jar:na]
>>     at
>>     scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>     ~[scala-library-2.10.4.jar:na]
>>     at
>>     scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>     ~[scala-library-2.10.4.jar:na]
>>     at scala.collection.TraversableOnce$class.to
>>     <http://class.to>(TraversableOnce.scala:273)
>>     ~[scala-library-2.10.4.jar:na]
>>     at scala.collection.AbstractIterator.to
>>     <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na]
>>     at
>>     scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>     ~[scala-library-2.10.4.jar:na]
>>     at
>>     scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>     ~[scala-library-2.10.4.jar:na]
>>     at
>>     scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>     ~[scala-library-2.10.4.jar:na]
>>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>     ~[scala-library-2.10.4.jar:na]
>>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>     ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>     ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>     at
>>     org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>     ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>     at
>>     org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>     ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>     at
>>     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>     ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>     at
>>     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>     ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>     at
>>     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     [na:1.8.0_31]
>>     at
>>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     [na:1.8.0_31]
>>     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
>>     WARN  o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage
>>     1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot
>>     be cast to scala.collection.Seq
>>     at
>>     org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>>     at
>>     org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>>     at
>>     org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>>     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>     at
>>     scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>     at
>>     scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>     at
>>     scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>     at scala.collection.TraversableOnce$class.to
>>     <http://class.to>(TraversableOnce.scala:273)
>>     at scala.collection.AbstractIterator.to
>>     <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)
>>     at
>>     scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>     at
>>     scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>     at
>>     org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>     at
>>     org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>     at
>>     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>     at
>>     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at
>>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>     Maybe I'm defining the schema incorrectly?
>>
>>
>>
>>     This test demonstrates both issues:
>>
>>         @Rule
>>         public TemporaryFolder tmp = new TemporaryFolder();
>>
>>         @Test
>>         public void testPercentileWithExplode() throws Exception {
>>             StructType schema =
>>     DataTypes.createStructType(Lists.newArrayList(
>>     DataTypes.createStructField("col1", DataTypes.StringType, false),
>>     DataTypes.createStructField("col2s",
>>     DataTypes.createArrayType(DataTypes.IntegerType, true), true)
>>             ));
>>
>>             JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
>>                     RowFactory.create("test", new int[]{1, 2, 3})
>>             ));
>>
>>             DataFrame df = sql.createDataFrame(rowRDD, schema);
>>             df.registerTempTable("df");
>>             df.printSchema();
>>
>>             List<int[]> ints = sql.sql("select col2s from df").javaRDD()
>>                                   .map(row -> (int[])
>>     row.get(0)).collect();
>>             assertEquals(1, ints.size());
>>             assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>>
>>
>>             // fails: lateral view explode does not work:
>>     java.lang.ClassCastException: [I cannot be cast to
>>     scala.collection.Seq
>>             List<Integer> explodedInts = sql.sql("select col2 from df
>>     lateral view explode(col2s) splode as col2").javaRDD()
>>     .map(row -> row.getInt(0)).collect();
>>             assertEquals(3, explodedInts.size());
>>             assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>>
>>
>>             // fails: java.lang.ClassCastException: [I cannot be cast
>>     to scala.collection.Seq
>>     df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() + "/parquet");
>>
>>
>>             DataFrame loadedDf =
>>     sql.load(tmp.getRoot().getAbsolutePath() + "/parquet");
>>     loadedDf.registerTempTable("loadedDf");
>>             List<int[]> moreInts = sql.sql("select col2s from
>>     loadedDf").javaRDD()
>>                                       .map(row -> (int[])
>>     row.get(0)).collect();
>>             assertEquals(1, moreInts.size());
>>             assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
>>         }
>>
>
>


Re: Spark SQL "lateral view explode" doesn't work, and unable to save array types to Parquet

Posted by Jon Chase <jo...@gmail.com>.
https://issues.apache.org/jira/browse/SPARK-6570

I also left in the call to saveAsParquetFile(), as it produced a similar
exception (though there was no use of explode there).

On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian <li...@gmail.com> wrote:

>  This should be a bug in the Explode.eval(), which always assumes the
> underlying SQL array is represented by a Scala Seq. Would you mind to open
> a JIRA ticket for this? Thanks!
>
> Cheng
>
> On 3/27/15 7:00 PM, Jon Chase wrote:
>
> Spark 1.3.0
>
>  Two issues:
>
>  a) I'm unable to get a "lateral view explode" query to work on an array
> type
> b) I'm unable to save an array type to a Parquet file
>
>  I keep running into this:
>
>        java.lang.ClassCastException: [I cannot be cast to
> scala.collection.Seq
>
>  Here's a stack trace from the explode issue:
>
>  root
>  |-- col1: string (nullable = false)
>  |-- col2s: array (nullable = true)
>  |    |-- element: integer (containsNull = true)
>
>  ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage
> 1.0 (TID 15)
> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
>  at
> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
> ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
>  at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>  at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> ~[scala-library-2.10.4.jar:na]
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library-2.10.4.jar:na]
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library-2.10.4.jar:na]
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library-2.10.4.jar:na]
>  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> ~[scala-library-2.10.4.jar:na]
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
>  at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> ~[scala-library-2.10.4.jar:na]
>  at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> ~[scala-library-2.10.4.jar:na]
>  at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> ~[scala-library-2.10.4.jar:na]
>  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> ~[scala-library-2.10.4.jar:na]
>  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
>  at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> ~[scala-library-2.10.4.jar:na]
>  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
>  at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> ~[scala-library-2.10.4.jar:na]
>  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
>  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>  at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>  at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>  at org.apache.spark.scheduler.Task.run(Task.scala:64)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_31]
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_31]
>  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
> WARN  o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID
> 15, localhost): java.lang.ClassCastException: [I cannot be cast to
> scala.collection.Seq
>  at
> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>  at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>  at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>  at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>  at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>  at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>  at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>  at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>  at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>  at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>  at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>  at org.apache.spark.scheduler.Task.run(Task.scala:64)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>
>
>
>  Maybe I'm defining the schema incorrectly?
>
>
>
>  This test demonstrates both issues:
>
>      @Rule
>     public TemporaryFolder tmp = new TemporaryFolder();
>
>      @Test
>     public void testPercentileWithExplode() throws Exception {
>         StructType schema = DataTypes.createStructType(Lists.newArrayList(
>                 DataTypes.createStructField("col1", DataTypes.StringType,
> false),
>                 DataTypes.createStructField("col2s",
> DataTypes.createArrayType(DataTypes.IntegerType, true), true)
>         ));
>
>          JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
>                 RowFactory.create("test", new int[]{1, 2, 3})
>         ));
>
>          DataFrame df = sql.createDataFrame(rowRDD, schema);
>         df.registerTempTable("df");
>         df.printSchema();
>
>          List<int[]> ints = sql.sql("select col2s from df").javaRDD()
>                               .map(row -> (int[]) row.get(0)).collect();
>         assertEquals(1, ints.size());
>         assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>
>
>          // fails: lateral view explode does not work:
> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
>         List<Integer> explodedInts = sql.sql("select col2 from df lateral
> view explode(col2s) splode as col2").javaRDD()
>                                         .map(row ->
> row.getInt(0)).collect();
>         assertEquals(3, explodedInts.size());
>         assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>
>
>          // fails: java.lang.ClassCastException: [I cannot be cast to
> scala.collection.Seq
>         df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() + "/parquet");
>
>
>          DataFrame loadedDf = sql.load(tmp.getRoot().getAbsolutePath() +
> "/parquet");
>         loadedDf.registerTempTable("loadedDf");
>         List<int[]> moreInts = sql.sql("select col2s from
> loadedDf").javaRDD()
>                                   .map(row -> (int[])
> row.get(0)).collect();
>         assertEquals(1, moreInts.size());
>         assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
>     }
>
>
>

Re: Spark SQL "lateral view explode" doesn't work, and unable to save array types to Parquet

Posted by Cheng Lian <li...@gmail.com>.
This should be a bug in the Explode.eval(), which always assumes the 
underlying SQL array is represented by a Scala Seq. Would you mind to 
open a JIRA ticket for this? Thanks!

Cheng

On 3/27/15 7:00 PM, Jon Chase wrote:
> Spark 1.3.0
>
> Two issues:
>
> a) I'm unable to get a "lateral view explode" query to work on an 
> array type
> b) I'm unable to save an array type to a Parquet file
>
> I keep running into this:
>
>       java.lang.ClassCastException: [I cannot be cast to 
> scala.collection.Seq
>
> Here's a stack trace from the explode issue:
>
> root
>  |-- col1: string (nullable = false)
>  |-- col2s: array (nullable = true)
>  |    |-- element: integer (containsNull = true)
>
> ERROR org.apache.spark.executor.Executor Exception in task 7.0 in 
> stage 1.0 (TID 15)
> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
> at 
> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) 
> ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
> at 
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) 
> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
> at 
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) 
> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
> ~[scala-library-2.10.4.jar:na]
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
> ~[scala-library-2.10.4.jar:na]
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
> ~[scala-library-2.10.4.jar:na]
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.TraversableOnce$class.to 
> <http://class.to>(TraversableOnce.scala:273) 
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.to 
> <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157) 
> ~[scala-library-2.10.4.jar:na]
> at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
> ~[scala-library-2.10.4.jar:na]
> at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
> ~[scala-library-2.10.4.jar:na]
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) 
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) 
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) 
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) 
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.scheduler.Task.run(Task.scala:64) 
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
> [na:1.8.0_31]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
> [na:1.8.0_31]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
> WARN  o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 
> (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast 
> to scala.collection.Seq
> at 
> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
> at 
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
> at 
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to 
> <http://class.to>(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to 
> <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Maybe I'm defining the schema incorrectly?
>
>
>
> This test demonstrates both issues:
>
>     @Rule
>     public TemporaryFolder tmp = new TemporaryFolder();
>
>     @Test
>     public void testPercentileWithExplode() throws Exception {
>         StructType schema = DataTypes.createStructType(Lists.newArrayList(
>                 DataTypes.createStructField("col1", 
> DataTypes.StringType, false),
>                 DataTypes.createStructField("col2s", 
> DataTypes.createArrayType(DataTypes.IntegerType, true), true)
>         ));
>
>         JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
>                 RowFactory.create("test", new int[]{1, 2, 3})
>         ));
>
>         DataFrame df = sql.createDataFrame(rowRDD, schema);
>         df.registerTempTable("df");
>         df.printSchema();
>
>         List<int[]> ints = sql.sql("select col2s from df").javaRDD()
>                               .map(row -> (int[]) row.get(0)).collect();
>         assertEquals(1, ints.size());
>         assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>
>
>         // fails: lateral view explode does not work: 
> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
>         List<Integer> explodedInts = sql.sql("select col2 from df 
> lateral view explode(col2s) splode as col2").javaRDD()
>                                         .map(row -> 
> row.getInt(0)).collect();
>         assertEquals(3, explodedInts.size());
>         assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>
>
>         // fails: java.lang.ClassCastException: [I cannot be cast to 
> scala.collection.Seq
> df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() + "/parquet");
>
>
>         DataFrame loadedDf = sql.load(tmp.getRoot().getAbsolutePath() 
> + "/parquet");
>         loadedDf.registerTempTable("loadedDf");
>         List<int[]> moreInts = sql.sql("select col2s from 
> loadedDf").javaRDD()
>                                   .map(row -> (int[]) 
> row.get(0)).collect();
>         assertEquals(1, moreInts.size());
>         assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
>     }
>