You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Masf <ma...@gmail.com> on 2015/03/16 12:11:04 UTC

Parquet and repartition

Hi all.

When I specify the number of partitions and save this RDD in parquet
format, my app fail. For example

selectTest.coalesce(28).saveAsParquetFile("hdfs://vm-clusterOutput")

However, it works well if I store data in text

selectTest.coalesce(28).saveAsTextFile("hdfs://vm-clusterOutput")


My spark version is 1.2.1

Is this bug registered?


-- 


Saludos.
Miguel Ángel

Re: Parquet and repartition

Posted by Cheng Lian <li...@gmail.com>.
Hey Masf,

I’ve created SPARK-6360 
<https://issues.apache.org/jira/browse/SPARK-6360> to track this issue. 
Detailed analysis is provided there. The TL;DR is, for Spark 1.1 and 
1.2, if a SchemaRDD contains decimal or UDT column(s), after applying 
any traditional RDD transformations (e.g. repartition, coalesce, 
distinct, …), calling saveAsParquetFile may trigger this issue.

Fortunately, Spark 1.3 isn’t affected as we replaced SchemaRDD with 
DataFrame, which properly handles this case.

Cheng

On 3/16/15 7:30 PM, Masf wrote:

> Thanks Sean, I forgot it
>
> The ouput error is the following:
>
> java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to 
> org.apache.spark.sql.catalyst.types.decimal.Decimal
> at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359)
> at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328)
> at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314)
> at 
> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
> at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
> at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
> at org.apache.spark.sql.parquet.InsertIntoParquetTable.org 
> <http://org.apache.spark.sql.parquet.InsertIntoParquetTable.org>$apache$spark$sql$parquet$InsertIntoParquetTable$writeShard$1(ParquetTableOperations.scala:308)
> at 
> org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at 
> org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/16 11:30:11 ERROR Executor: Exception in task 1.0 in stage 6.0 
> (TID 207)
> java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to 
> org.apache.spark.sql.catalyst.types.decimal.Decimal
> at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359)
> at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328)
> at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314)
> at 
> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
> at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
> at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
> at org.apache.spark.sql.parquet.InsertIntoParquetTable.org 
> <http://org.apache.spark.sql.parquet.InsertIntoParquetTable.org>$apache$spark$sql$parquet$InsertIntoParquetTable$writeShard$1(ParquetTableOperations.scala:308)
> at 
> org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at 
> org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/16 11:30:11 INFO TaskSetManager: Starting task 2.0 in stage 6.0 
> (TID 208, localhost, ANY, 2878 bytes)
> 15/03/16 11:30:11 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 
> 206, localhost): java.lang.ClassCastException: scala.math.BigDecimal 
> cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal
> at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359)
> at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328)
> at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314)
> at 
> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
> at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
> at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
> at org.apache.spark.sql.parquet.InsertIntoParquetTable.org 
> <http://org.apache.spark.sql.parquet.InsertIntoParquetTable.org>$apache$spark$sql$parquet$InsertIntoParquetTable$writeShard$1(ParquetTableOperations.scala:308)
> at 
> org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at 
> org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> On Mon, Mar 16, 2015 at 12:19 PM, Sean Owen <sowen@cloudera.com 
> <ma...@cloudera.com>> wrote:
>
>     You forgot to give any information about what "fail" means here.
>
>     On Mon, Mar 16, 2015 at 11:11 AM, Masf <masfworld@gmail.com
>     <ma...@gmail.com>> wrote:
>     > Hi all.
>     >
>     > When I specify the number of partitions and save this RDD in
>     parquet format,
>     > my app fail. For example
>     >
>     > selectTest.coalesce(28).saveAsParquetFile("hdfs://vm-clusterOutput")
>     >
>     > However, it works well if I store data in text
>     >
>     > selectTest.coalesce(28).saveAsTextFile("hdfs://vm-clusterOutput")
>     >
>     >
>     > My spark version is 1.2.1
>     >
>     > Is this bug registered?
>     >
>     >
>     > --
>     >
>     >
>     > Saludos.
>     > Miguel Ángel
>
>
>
>
> -- 
>
>
> Saludos.
> Miguel Ángel

​

Re: Parquet and repartition

Posted by Masf <ma...@gmail.com>.
Thanks Sean, I forgot it

The ouput error is the following:

java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to
org.apache.spark.sql.catalyst.types.decimal.Decimal
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.org
$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:308)
at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/03/16 11:30:11 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID
207)
java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to
org.apache.spark.sql.catalyst.types.decimal.Decimal
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.org
$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:308)
at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/03/16 11:30:11 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID
208, localhost, ANY, 2878 bytes)
15/03/16 11:30:11 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 206,
localhost): java.lang.ClassCastException: scala.math.BigDecimal cannot be
cast to org.apache.spark.sql.catalyst.types.decimal.Decimal
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.org
$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:308)
at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



On Mon, Mar 16, 2015 at 12:19 PM, Sean Owen <so...@cloudera.com> wrote:

> You forgot to give any information about what "fail" means here.
>
> On Mon, Mar 16, 2015 at 11:11 AM, Masf <ma...@gmail.com> wrote:
> > Hi all.
> >
> > When I specify the number of partitions and save this RDD in parquet
> format,
> > my app fail. For example
> >
> > selectTest.coalesce(28).saveAsParquetFile("hdfs://vm-clusterOutput")
> >
> > However, it works well if I store data in text
> >
> > selectTest.coalesce(28).saveAsTextFile("hdfs://vm-clusterOutput")
> >
> >
> > My spark version is 1.2.1
> >
> > Is this bug registered?
> >
> >
> > --
> >
> >
> > Saludos.
> > Miguel Ángel
>



-- 


Saludos.
Miguel Ángel