You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jon Chase <jo...@gmail.com> on 2015/03/27 12:00:24 UTC
Spark SQL "lateral view explode" doesn't work, and unable to save
array types to Parquet
Spark 1.3.0
Two issues:
a) I'm unable to get a "lateral view explode" query to work on an array type
b) I'm unable to save an array type to a Parquet file
I keep running into this:
java.lang.ClassCastException: [I cannot be cast to
scala.collection.Seq
Here's a stack trace from the explode issue:
root
|-- col1: string (nullable = false)
|-- col2s: array (nullable = true)
| |-- element: integer (containsNull = true)
ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0
(TID 15)
java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
at
org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
~[spark-sql_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
~[spark-sql_2.10-1.3.0.jar:1.3.0]
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
~[scala-library-2.10.4.jar:na]
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.scheduler.Task.run(Task.scala:64)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
~[spark-core_2.10-1.3.0.jar:1.3.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_31]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_31]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID
15, localhost): java.lang.ClassCastException: [I cannot be cast to
scala.collection.Seq
at
org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Maybe I'm defining the schema incorrectly?
This test demonstrates both issues:
@Rule
public TemporaryFolder tmp = new TemporaryFolder();
@Test
public void testPercentileWithExplode() throws Exception {
StructType schema = DataTypes.createStructType(Lists.newArrayList(
DataTypes.createStructField("col1", DataTypes.StringType,
false),
DataTypes.createStructField("col2s",
DataTypes.createArrayType(DataTypes.IntegerType, true), true)
));
JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
RowFactory.create("test", new int[]{1, 2, 3})
));
DataFrame df = sql.createDataFrame(rowRDD, schema);
df.registerTempTable("df");
df.printSchema();
List<int[]> ints = sql.sql("select col2s from df").javaRDD()
.map(row -> (int[]) row.get(0)).collect();
assertEquals(1, ints.size());
assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
// fails: lateral view explode does not work:
java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
List<Integer> explodedInts = sql.sql("select col2 from df lateral
view explode(col2s) splode as col2").javaRDD()
.map(row ->
row.getInt(0)).collect();
assertEquals(3, explodedInts.size());
assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
// fails: java.lang.ClassCastException: [I cannot be cast to
scala.collection.Seq
df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() + "/parquet");
DataFrame loadedDf = sql.load(tmp.getRoot().getAbsolutePath() +
"/parquet");
loadedDf.registerTempTable("loadedDf");
List<int[]> moreInts = sql.sql("select col2s from
loadedDf").javaRDD()
.map(row -> (int[]) row.get(0)).collect();
assertEquals(1, moreInts.size());
assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
}
Re: Spark SQL "lateral view explode" doesn't work, and unable to
save array types to Parquet
Posted by Cheng Lian <li...@gmail.com>.
Thanks for the detailed information!
On 3/27/15 9:16 PM, Jon Chase wrote:
> Done. I also updated the name on the ticket to include both issues.
> "Spark SQL arrays: "explode()" fails and cannot save array type to
> Parquet"
>
> https://issues.apache.org/jira/browse/SPARK-6570
>
> On Fri, Mar 27, 2015 at 8:14 AM, Cheng Lian <lian.cs.zju@gmail.com
> <ma...@gmail.com>> wrote:
>
> Forgot to mention that, would you mind to also provide the full
> stack trace of the exception thrown in the saveAsParquetFile call?
> Thanks!
>
> Cheng
>
> On 3/27/15 7:35 PM, Jon Chase wrote:
>> https://issues.apache.org/jira/browse/SPARK-6570
>>
>> I also left in the call to saveAsParquetFile(), as it produced a
>> similar exception (though there was no use of explode there).
>>
>> On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian
>> <lian.cs.zju@gmail.com <ma...@gmail.com>> wrote:
>>
>> This should be a bug in the Explode.eval(), which always
>> assumes the underlying SQL array is represented by a Scala
>> Seq. Would you mind to open a JIRA ticket for this? Thanks!
>>
>> Cheng
>>
>> On 3/27/15 7:00 PM, Jon Chase wrote:
>>> Spark 1.3.0
>>>
>>> Two issues:
>>>
>>> a) I'm unable to get a "lateral view explode" query to work
>>> on an array type
>>> b) I'm unable to save an array type to a Parquet file
>>>
>>> I keep running into this:
>>>
>>> java.lang.ClassCastException: [I cannot be cast to
>>> scala.collection.Seq
>>>
>>> Here's a stack trace from the explode issue:
>>>
>>> root
>>> |-- col1: string (nullable = false)
>>> |-- col2s: array (nullable = true)
>>> | |-- element: integer (containsNull = true)
>>>
>>> ERROR org.apache.spark.executor.Executor Exception in task
>>> 7.0 in stage 1.0 (TID 15)
>>> java.lang.ClassCastException: [I cannot be cast to
>>> scala.collection.Seq
>>> at
>>> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>>> ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
>>> at
>>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>>> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>>> at
>>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>>> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>>> at
>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>> ~[scala-library-2.10.4.jar:na]
>>> at scala.collection.TraversableOnce$class.to
>>> <http://class.to>(TraversableOnce.scala:273)
>>> ~[scala-library-2.10.4.jar:na]
>>> at scala.collection.AbstractIterator.to
>>> <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>> ~[scala-library-2.10.4.jar:na]
>>> at
>>> scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>> ~[scala-library-2.10.4.jar:na]
>>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> [na:1.8.0_31]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> [na:1.8.0_31]
>>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
>>> WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in
>>> stage 1.0 (TID 15, localhost): java.lang.ClassCastException:
>>> [I cannot be cast to scala.collection.Seq
>>> at
>>> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>>> at
>>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>>> at
>>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>>> at
>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at
>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at
>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at
>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>> at scala.collection.TraversableOnce$class.to
>>> <http://class.to>(TraversableOnce.scala:273)
>>> at scala.collection.AbstractIterator.to
>>> <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>> at
>>> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>> at
>>> scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> Maybe I'm defining the schema incorrectly?
>>>
>>>
>>>
>>> This test demonstrates both issues:
>>>
>>> @Rule
>>> public TemporaryFolder tmp = new TemporaryFolder();
>>>
>>> @Test
>>> public void testPercentileWithExplode() throws Exception {
>>> StructType schema =
>>> DataTypes.createStructType(Lists.newArrayList(
>>> DataTypes.createStructField("col1", DataTypes.StringType,
>>> false),
>>> DataTypes.createStructField("col2s",
>>> DataTypes.createArrayType(DataTypes.IntegerType, true), true)
>>> ));
>>>
>>> JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
>>> RowFactory.create("test", new int[]{1, 2, 3})
>>> ));
>>>
>>> DataFrame df = sql.createDataFrame(rowRDD, schema);
>>> df.registerTempTable("df");
>>> df.printSchema();
>>>
>>> List<int[]> ints = sql.sql("select col2s from
>>> df").javaRDD()
>>> .map(row -> (int[]) row.get(0)).collect();
>>> assertEquals(1, ints.size());
>>> assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>>>
>>>
>>> // fails: lateral view explode does not work:
>>> java.lang.ClassCastException: [I cannot be cast to
>>> scala.collection.Seq
>>> List<Integer> explodedInts = sql.sql("select col2
>>> from df lateral view explode(col2s) splode as col2").javaRDD()
>>> .map(row -> row.getInt(0)).collect();
>>> assertEquals(3, explodedInts.size());
>>> assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>>>
>>>
>>> // fails: java.lang.ClassCastException: [I cannot be
>>> cast to scala.collection.Seq
>>> df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() +
>>> "/parquet");
>>>
>>>
>>> DataFrame loadedDf =
>>> sql.load(tmp.getRoot().getAbsolutePath() + "/parquet");
>>> loadedDf.registerTempTable("loadedDf");
>>> List<int[]> moreInts = sql.sql("select col2s from
>>> loadedDf").javaRDD()
>>> .map(row -> (int[]) row.get(0)).collect();
>>> assertEquals(1, moreInts.size());
>>> assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
>>> }
>>>
>>
>>
>
>
Re: Spark SQL "lateral view explode" doesn't work, and unable to save
array types to Parquet
Posted by Jon Chase <jo...@gmail.com>.
Done. I also updated the name on the ticket to include both issues.
"Spark SQL arrays: "explode()" fails and cannot save array type to Parquet"
https://issues.apache.org/jira/browse/SPARK-6570
On Fri, Mar 27, 2015 at 8:14 AM, Cheng Lian <li...@gmail.com> wrote:
> Forgot to mention that, would you mind to also provide the full stack
> trace of the exception thrown in the saveAsParquetFile call? Thanks!
>
> Cheng
>
> On 3/27/15 7:35 PM, Jon Chase wrote:
>
> https://issues.apache.org/jira/browse/SPARK-6570
>
> I also left in the call to saveAsParquetFile(), as it produced a similar
> exception (though there was no use of explode there).
>
> On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian <li...@gmail.com> wrote:
>
>> This should be a bug in the Explode.eval(), which always assumes the
>> underlying SQL array is represented by a Scala Seq. Would you mind to open
>> a JIRA ticket for this? Thanks!
>>
>> Cheng
>>
>> On 3/27/15 7:00 PM, Jon Chase wrote:
>>
>> Spark 1.3.0
>>
>> Two issues:
>>
>> a) I'm unable to get a "lateral view explode" query to work on an array
>> type
>> b) I'm unable to save an array type to a Parquet file
>>
>> I keep running into this:
>>
>> java.lang.ClassCastException: [I cannot be cast to
>> scala.collection.Seq
>>
>> Here's a stack trace from the explode issue:
>>
>> root
>> |-- col1: string (nullable = false)
>> |-- col2s: array (nullable = true)
>> | |-- element: integer (containsNull = true)
>>
>> ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage
>> 1.0 (TID 15)
>> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
>> at
>> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>> ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> [na:1.8.0_31]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> [na:1.8.0_31]
>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
>> WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID
>> 15, localhost): java.lang.ClassCastException: [I cannot be cast to
>> scala.collection.Seq
>> at
>> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>> at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>> at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> Maybe I'm defining the schema incorrectly?
>>
>>
>>
>> This test demonstrates both issues:
>>
>> @Rule
>> public TemporaryFolder tmp = new TemporaryFolder();
>>
>> @Test
>> public void testPercentileWithExplode() throws Exception {
>> StructType schema = DataTypes.createStructType(Lists.newArrayList(
>> DataTypes.createStructField("col1", DataTypes.StringType,
>> false),
>> DataTypes.createStructField("col2s",
>> DataTypes.createArrayType(DataTypes.IntegerType, true), true)
>> ));
>>
>> JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
>> RowFactory.create("test", new int[]{1, 2, 3})
>> ));
>>
>> DataFrame df = sql.createDataFrame(rowRDD, schema);
>> df.registerTempTable("df");
>> df.printSchema();
>>
>> List<int[]> ints = sql.sql("select col2s from df").javaRDD()
>> .map(row -> (int[]) row.get(0)).collect();
>> assertEquals(1, ints.size());
>> assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>>
>>
>> // fails: lateral view explode does not work:
>> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
>> List<Integer> explodedInts = sql.sql("select col2 from df lateral
>> view explode(col2s) splode as col2").javaRDD()
>> .map(row ->
>> row.getInt(0)).collect();
>> assertEquals(3, explodedInts.size());
>> assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>>
>>
>> // fails: java.lang.ClassCastException: [I cannot be cast to
>> scala.collection.Seq
>> df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() +
>> "/parquet");
>>
>>
>> DataFrame loadedDf = sql.load(tmp.getRoot().getAbsolutePath() +
>> "/parquet");
>> loadedDf.registerTempTable("loadedDf");
>> List<int[]> moreInts = sql.sql("select col2s from
>> loadedDf").javaRDD()
>> .map(row -> (int[])
>> row.get(0)).collect();
>> assertEquals(1, moreInts.size());
>> assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
>> }
>>
>>
>>
>
>
Re: Spark SQL "lateral view explode" doesn't work, and unable to
save array types to Parquet
Posted by Cheng Lian <li...@gmail.com>.
Forgot to mention that, would you mind to also provide the full stack
trace of the exception thrown in the saveAsParquetFile call? Thanks!
Cheng
On 3/27/15 7:35 PM, Jon Chase wrote:
> https://issues.apache.org/jira/browse/SPARK-6570
>
> I also left in the call to saveAsParquetFile(), as it produced a
> similar exception (though there was no use of explode there).
>
> On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian <lian.cs.zju@gmail.com
> <ma...@gmail.com>> wrote:
>
> This should be a bug in the Explode.eval(), which always assumes
> the underlying SQL array is represented by a Scala Seq. Would you
> mind to open a JIRA ticket for this? Thanks!
>
> Cheng
>
> On 3/27/15 7:00 PM, Jon Chase wrote:
>> Spark 1.3.0
>>
>> Two issues:
>>
>> a) I'm unable to get a "lateral view explode" query to work on an
>> array type
>> b) I'm unable to save an array type to a Parquet file
>>
>> I keep running into this:
>>
>> java.lang.ClassCastException: [I cannot be cast to
>> scala.collection.Seq
>>
>> Here's a stack trace from the explode issue:
>>
>> root
>> |-- col1: string (nullable = false)
>> |-- col2s: array (nullable = true)
>> | |-- element: integer (containsNull = true)
>>
>> ERROR org.apache.spark.executor.Executor Exception in task 7.0 in
>> stage 1.0 (TID 15)
>> java.lang.ClassCastException: [I cannot be cast to
>> scala.collection.Seq
>> at
>> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>> ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.TraversableOnce$class.to
>> <http://class.to>(TraversableOnce.scala:273)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.AbstractIterator.to
>> <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> ~[scala-library-2.10.4.jar:na]
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> ~[scala-library-2.10.4.jar:na]
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> ~[spark-core_2.10-1.3.0.jar:1.3.0]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> [na:1.8.0_31]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> [na:1.8.0_31]
>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
>> WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage
>> 1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot
>> be cast to scala.collection.Seq
>> at
>> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
>> at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
>> at
>> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at scala.collection.TraversableOnce$class.to
>> <http://class.to>(TraversableOnce.scala:273)
>> at scala.collection.AbstractIterator.to
>> <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> Maybe I'm defining the schema incorrectly?
>>
>>
>>
>> This test demonstrates both issues:
>>
>> @Rule
>> public TemporaryFolder tmp = new TemporaryFolder();
>>
>> @Test
>> public void testPercentileWithExplode() throws Exception {
>> StructType schema =
>> DataTypes.createStructType(Lists.newArrayList(
>> DataTypes.createStructField("col1", DataTypes.StringType, false),
>> DataTypes.createStructField("col2s",
>> DataTypes.createArrayType(DataTypes.IntegerType, true), true)
>> ));
>>
>> JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
>> RowFactory.create("test", new int[]{1, 2, 3})
>> ));
>>
>> DataFrame df = sql.createDataFrame(rowRDD, schema);
>> df.registerTempTable("df");
>> df.printSchema();
>>
>> List<int[]> ints = sql.sql("select col2s from df").javaRDD()
>> .map(row -> (int[])
>> row.get(0)).collect();
>> assertEquals(1, ints.size());
>> assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>>
>>
>> // fails: lateral view explode does not work:
>> java.lang.ClassCastException: [I cannot be cast to
>> scala.collection.Seq
>> List<Integer> explodedInts = sql.sql("select col2 from df
>> lateral view explode(col2s) splode as col2").javaRDD()
>> .map(row -> row.getInt(0)).collect();
>> assertEquals(3, explodedInts.size());
>> assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>>
>>
>> // fails: java.lang.ClassCastException: [I cannot be cast
>> to scala.collection.Seq
>> df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() + "/parquet");
>>
>>
>> DataFrame loadedDf =
>> sql.load(tmp.getRoot().getAbsolutePath() + "/parquet");
>> loadedDf.registerTempTable("loadedDf");
>> List<int[]> moreInts = sql.sql("select col2s from
>> loadedDf").javaRDD()
>> .map(row -> (int[])
>> row.get(0)).collect();
>> assertEquals(1, moreInts.size());
>> assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
>> }
>>
>
>
Re: Spark SQL "lateral view explode" doesn't work, and unable to save
array types to Parquet
Posted by Jon Chase <jo...@gmail.com>.
https://issues.apache.org/jira/browse/SPARK-6570
I also left in the call to saveAsParquetFile(), as it produced a similar
exception (though there was no use of explode there).
On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian <li...@gmail.com> wrote:
> This should be a bug in the Explode.eval(), which always assumes the
> underlying SQL array is represented by a Scala Seq. Would you mind to open
> a JIRA ticket for this? Thanks!
>
> Cheng
>
> On 3/27/15 7:00 PM, Jon Chase wrote:
>
> Spark 1.3.0
>
> Two issues:
>
> a) I'm unable to get a "lateral view explode" query to work on an array
> type
> b) I'm unable to save an array type to a Parquet file
>
> I keep running into this:
>
> java.lang.ClassCastException: [I cannot be cast to
> scala.collection.Seq
>
> Here's a stack trace from the explode issue:
>
> root
> |-- col1: string (nullable = false)
> |-- col2s: array (nullable = true)
> | |-- element: integer (containsNull = true)
>
> ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage
> 1.0 (TID 15)
> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
> at
> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
> ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
> at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
> at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_31]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_31]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
> WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID
> 15, localhost): java.lang.ClassCastException: [I cannot be cast to
> scala.collection.Seq
> at
> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
> at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
> at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Maybe I'm defining the schema incorrectly?
>
>
>
> This test demonstrates both issues:
>
> @Rule
> public TemporaryFolder tmp = new TemporaryFolder();
>
> @Test
> public void testPercentileWithExplode() throws Exception {
> StructType schema = DataTypes.createStructType(Lists.newArrayList(
> DataTypes.createStructField("col1", DataTypes.StringType,
> false),
> DataTypes.createStructField("col2s",
> DataTypes.createArrayType(DataTypes.IntegerType, true), true)
> ));
>
> JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
> RowFactory.create("test", new int[]{1, 2, 3})
> ));
>
> DataFrame df = sql.createDataFrame(rowRDD, schema);
> df.registerTempTable("df");
> df.printSchema();
>
> List<int[]> ints = sql.sql("select col2s from df").javaRDD()
> .map(row -> (int[]) row.get(0)).collect();
> assertEquals(1, ints.size());
> assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>
>
> // fails: lateral view explode does not work:
> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
> List<Integer> explodedInts = sql.sql("select col2 from df lateral
> view explode(col2s) splode as col2").javaRDD()
> .map(row ->
> row.getInt(0)).collect();
> assertEquals(3, explodedInts.size());
> assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>
>
> // fails: java.lang.ClassCastException: [I cannot be cast to
> scala.collection.Seq
> df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() + "/parquet");
>
>
> DataFrame loadedDf = sql.load(tmp.getRoot().getAbsolutePath() +
> "/parquet");
> loadedDf.registerTempTable("loadedDf");
> List<int[]> moreInts = sql.sql("select col2s from
> loadedDf").javaRDD()
> .map(row -> (int[])
> row.get(0)).collect();
> assertEquals(1, moreInts.size());
> assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
> }
>
>
>
Re: Spark SQL "lateral view explode" doesn't work, and unable to
save array types to Parquet
Posted by Cheng Lian <li...@gmail.com>.
This should be a bug in the Explode.eval(), which always assumes the
underlying SQL array is represented by a Scala Seq. Would you mind to
open a JIRA ticket for this? Thanks!
Cheng
On 3/27/15 7:00 PM, Jon Chase wrote:
> Spark 1.3.0
>
> Two issues:
>
> a) I'm unable to get a "lateral view explode" query to work on an
> array type
> b) I'm unable to save an array type to a Parquet file
>
> I keep running into this:
>
> java.lang.ClassCastException: [I cannot be cast to
> scala.collection.Seq
>
> Here's a stack trace from the explode issue:
>
> root
> |-- col1: string (nullable = false)
> |-- col2s: array (nullable = true)
> | |-- element: integer (containsNull = true)
>
> ERROR org.apache.spark.executor.Executor Exception in task 7.0 in
> stage 1.0 (TID 15)
> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
> at
> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
> ~[spark-catalyst_2.10-1.3.0.jar:1.3.0]
> at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
> at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
> ~[spark-sql_2.10-1.3.0.jar:1.3.0]
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.TraversableOnce$class.to
> <http://class.to>(TraversableOnce.scala:273)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.to
> <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> ~[scala-library-2.10.4.jar:na]
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> ~[scala-library-2.10.4.jar:na]
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> ~[spark-core_2.10-1.3.0.jar:1.3.0]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_31]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_31]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
> WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0
> (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast
> to scala.collection.Seq
> at
> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125)
> at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
> at
> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> <http://class.to>(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to
> <http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Maybe I'm defining the schema incorrectly?
>
>
>
> This test demonstrates both issues:
>
> @Rule
> public TemporaryFolder tmp = new TemporaryFolder();
>
> @Test
> public void testPercentileWithExplode() throws Exception {
> StructType schema = DataTypes.createStructType(Lists.newArrayList(
> DataTypes.createStructField("col1",
> DataTypes.StringType, false),
> DataTypes.createStructField("col2s",
> DataTypes.createArrayType(DataTypes.IntegerType, true), true)
> ));
>
> JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList(
> RowFactory.create("test", new int[]{1, 2, 3})
> ));
>
> DataFrame df = sql.createDataFrame(rowRDD, schema);
> df.registerTempTable("df");
> df.printSchema();
>
> List<int[]> ints = sql.sql("select col2s from df").javaRDD()
> .map(row -> (int[]) row.get(0)).collect();
> assertEquals(1, ints.size());
> assertArrayEquals(new int[]{1, 2, 3}, ints.get(0));
>
>
> // fails: lateral view explode does not work:
> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq
> List<Integer> explodedInts = sql.sql("select col2 from df
> lateral view explode(col2s) splode as col2").javaRDD()
> .map(row ->
> row.getInt(0)).collect();
> assertEquals(3, explodedInts.size());
> assertEquals(Lists.newArrayList(1, 2, 3), explodedInts);
>
>
> // fails: java.lang.ClassCastException: [I cannot be cast to
> scala.collection.Seq
> df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() + "/parquet");
>
>
> DataFrame loadedDf = sql.load(tmp.getRoot().getAbsolutePath()
> + "/parquet");
> loadedDf.registerTempTable("loadedDf");
> List<int[]> moreInts = sql.sql("select col2s from
> loadedDf").javaRDD()
> .map(row -> (int[])
> row.get(0)).collect();
> assertEquals(1, moreInts.size());
> assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0));
> }
>