You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anubhav Agarwal <an...@gmail.com> on 2015/10/23 17:25:07 UTC

Improve parquet write speed to HDFS and spark.sql.execution.id is already set ERROR

I have a spark job that creates 6 million rows in RDDs. I convert the RDD
into Data-frame and write it to HDFS. Currently it takes 3 minutes to write
it to HDFS.

Here is the snippet:-
RDDList.parallelStream().forEach(mapJavaRDD -> {
                    if (mapJavaRDD != null) {
                        JavaRDD<Row> rowRDD =
mapJavaRDD.mapPartitionsWithIndex((integer, v2) -> {
                            <logical operation>
                            return new ArrayList<Row>(1).iterator();
                        }, false);

                        DataFrame dF = sqlContext.createDataFrame(rowRDD,
schema).coalesce(3);
                        synchronized (finalLock) {
                            dF.write().mode(SaveMode.Append).parquet("hdfs
location");
                        }

                });

After looking into the logs I know the following is the reason for the job
taking too long:-
                            *dF.write().mode(SaveMode.Append).parquet("hdfs
location");*

I also get the following errors due to it:-
15/10/21 21:12:30 WARN scheduler.TaskSetManager: Stage 31 contains a task
of very large size (378 KB). The maximum recommended task size is 100 KB.4
of these kind of warnings appeared.

java.lang.IllegalArgumentException: java.lang.IllegalArgumentException:
spark.sql.execution.id is already set

Re: Improve parquet write speed to HDFS and spark.sql.execution.id is already set ERROR

Posted by Anubhav Agarwal <an...@gmail.com>.
I was getting the following error without it:-
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
No lease on /.gz.parquet (inode ): File does not exist. [Lease.  Holder:
DFSClient_NONMAPREDUCE_, pendingcreates: 1]

I think that is due to deadlock.

On Tue, Nov 3, 2015 at 7:48 AM, Ted Yu <yu...@gmail.com> wrote:

> I am a bit curious: why is the synchronization on finalLock is needed ?
>
> Thanks
>
> On Oct 23, 2015, at 8:25 AM, Anubhav Agarwal <an...@gmail.com> wrote:
>
> I have a spark job that creates 6 million rows in RDDs. I convert the RDD
> into Data-frame and write it to HDFS. Currently it takes 3 minutes to write
> it to HDFS.
>
> Here is the snippet:-
> RDDList.parallelStream().forEach(mapJavaRDD -> {
>                     if (mapJavaRDD != null) {
>                         JavaRDD<Row> rowRDD =
> mapJavaRDD.mapPartitionsWithIndex((integer, v2) -> {
>                             <logical operation>
>                             return new ArrayList<Row>(1).iterator();
>                         }, false);
>
>                         DataFrame dF = sqlContext.createDataFrame(rowRDD,
> schema).coalesce(3);
>                         synchronized (finalLock) {
>                             dF.write().mode(SaveMode.Append).parquet("hdfs
> location");
>                         }
>
>                 });
>
> After looking into the logs I know the following is the reason for the job
> taking too long:-
>                             *dF.write().mode(SaveMode.Append).parquet("hdfs
> location");*
>
> I also get the following errors due to it:-
> 15/10/21 21:12:30 WARN scheduler.TaskSetManager: Stage 31 contains a task
> of very large size (378 KB). The maximum recommended task size is 100 KB.4
> of these kind of warnings appeared.
>
> java.lang.IllegalArgumentException: java.lang.IllegalArgumentException:
> spark.sql.execution.id is already set
>
>

Re: Improve parquet write speed to HDFS and spark.sql.execution.id is already set ERROR

Posted by Ted Yu <yu...@gmail.com>.
I am a bit curious: why is the synchronization on finalLock is needed ?

Thanks

> On Oct 23, 2015, at 8:25 AM, Anubhav Agarwal <an...@gmail.com> wrote:
> 
> I have a spark job that creates 6 million rows in RDDs. I convert the RDD into Data-frame and write it to HDFS. Currently it takes 3 minutes to write it to HDFS.
> 
> Here is the snippet:-
> RDDList.parallelStream().forEach(mapJavaRDD -> {
>                     if (mapJavaRDD != null) {
>                         JavaRDD<Row> rowRDD = mapJavaRDD.mapPartitionsWithIndex((integer, v2) -> {
>                             <logical operation>
>                             return new ArrayList<Row>(1).iterator();
>                         }, false);
> 
>                         DataFrame dF = sqlContext.createDataFrame(rowRDD, schema).coalesce(3);
>                         synchronized (finalLock) {
>                             dF.write().mode(SaveMode.Append).parquet("hdfs location");
>                         }
> 
>                 });
> 
> After looking into the logs I know the following is the reason for the job taking too long:-
>                             dF.write().mode(SaveMode.Append).parquet("hdfs location");
> 
> I also get the following errors due to it:-
> 15/10/21 21:12:30 WARN scheduler.TaskSetManager: Stage 31 contains a task of very large size (378 KB). The maximum recommended task size is 100 KB.4 of these kind of warnings appeared.
> 
> java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: spark.sql.execution.id is already set