You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by tridib <tr...@live.com> on 2014/11/25 05:24:25 UTC

Control number of parquet generated from JavaSchemaRDD

Hello,
I am reading around 1000 input files from disk in an RDD and generating
parquet. It always produces same number of parquet files as number of input
files. I tried to merge them using 

rdd.coalesce(n) and/or rdd.repatition(n).
also tried using:

        int MB_128 = 128*1024*1024;
        sc.hadoopConfiguration().setInt("dfs.blocksize", MB_128);
        sc.hadoopConfiguration().setInt("parquet.block.size", MB_128);

No luck.
Is there a way to control the size/number of parquet files generated?

Thanks
Tridib



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Control number of parquet generated from JavaSchemaRDD

Posted by Naveen Kumar Pokala <np...@spcapitaliq.com>.
Hi,

While submitting your spark job mention --executor-cores 2 --num-executors 24 it will divide the dataset into 24*2 parquet files.

Or set spark.default.parallelism value like 50 on sparkconf object. It will divide the dataset into 50 files into your HDFS.


-Naveen

-----Original Message-----
From: tridib [mailto:tridib.samanta@live.com] 
Sent: Tuesday, November 25, 2014 9:54 AM
To: user@spark.incubator.apache.org
Subject: Control number of parquet generated from JavaSchemaRDD

Hello,
I am reading around 1000 input files from disk in an RDD and generating parquet. It always produces same number of parquet files as number of input files. I tried to merge them using 

rdd.coalesce(n) and/or rdd.repatition(n).
also tried using:

        int MB_128 = 128*1024*1024;
        sc.hadoopConfiguration().setInt("dfs.blocksize", MB_128);
        sc.hadoopConfiguration().setInt("parquet.block.size", MB_128);

No luck.
Is there a way to control the size/number of parquet files generated?

Thanks
Tridib



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Control number of parquet generated from JavaSchemaRDD

Posted by Michael Armbrust <mi...@databricks.com>.
I believe coalesce(..., true) and repartition are the same.  If the input
files are of similar sizes, then coalesce will be cheaper as it introduces a
narrow dependency
<https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf>,
meaning there won't be a shuffle.  However, if there is a lot of skew in
the input file size, then a repartition will ensure that data is shuffled
evenly.

There is currently no way to control the file size other than pick a 'good'
number of partitions.

On Tue, Nov 25, 2014 at 11:30 AM, tridib <tr...@live.com> wrote:

> Thanks Michael,
> It worked like a charm! I have few more queries:
> 1. Is there a way to control the size of parquet file?
> 2. Which method do you recommend coalesce(n, true), coalesce(n, false) or
> repartition(n)?
>
> Thanks & Regards
> Tridib
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19789.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Control number of parquet generated from JavaSchemaRDD

Posted by tridib <tr...@live.com>.
Thanks Michael,
It worked like a charm! I have few more queries:
1. Is there a way to control the size of parquet file?
2. Which method do you recommend coalesce(n, true), coalesce(n, false) or
repartition(n)?

Thanks & Regards
Tridib




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19789.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Control number of parquet generated from JavaSchemaRDD

Posted by tridib <tr...@live.com>.
Ohh...how can I miss that. :(. Thanks!



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19788.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Control number of parquet generated from JavaSchemaRDD

Posted by Michael Armbrust <mi...@databricks.com>.
RDDs are immutable, so calling coalesce doesn't actually change the RDD but
instead returns a new RDD that has fewer partitions.  You need to save that
to a variable and call saveAsParquetFile on the new RDD.

On Tue, Nov 25, 2014 at 10:07 AM, tridib <tr...@live.com> wrote:

>     public void generateParquet(JavaSparkContext sc, String jsonFilePath,
> String parquetPath) {
>         //int MB_128 = 128*1024*1024;
>         //sc.hadoopConfiguration().setInt("dfs.blocksize", MB_128);
>         //sc.hadoopConfiguration().setInt("parquet.block.size", MB_128);
>         JavaSQLContext sqlCtx = new JavaSQLContext(sc);
>         JavaRDD<Claim> claimRdd = sc.textFile(jsonFilePath).map(new
> StringToClaimMapper()).filter(new NullFilter());
>         JavaSchemaRDD claimSchemaRdd = sqlCtx.applySchema(claimRdd,
> Claim.class);
>         claimSchemaRdd.coalesce(1, true); //tried with false also. Tried
> repartition(1) too.
>
>         claimSchemaRdd.saveAsParquetFile(parquetPath);
>     }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19776.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Control number of parquet generated from JavaSchemaRDD

Posted by tridib <tr...@live.com>.
    public void generateParquet(JavaSparkContext sc, String jsonFilePath,
String parquetPath) {
        //int MB_128 = 128*1024*1024;
        //sc.hadoopConfiguration().setInt("dfs.blocksize", MB_128);
        //sc.hadoopConfiguration().setInt("parquet.block.size", MB_128);
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);
        JavaRDD<Claim> claimRdd = sc.textFile(jsonFilePath).map(new
StringToClaimMapper()).filter(new NullFilter());
        JavaSchemaRDD claimSchemaRdd = sqlCtx.applySchema(claimRdd,
Claim.class);
        claimSchemaRdd.coalesce(1, true); //tried with false also. Tried
repartition(1) too.
        
        claimSchemaRdd.saveAsParquetFile(parquetPath);
    }



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19776.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Control number of parquet generated from JavaSchemaRDD

Posted by tridib <tr...@live.com>.
I am experimenting with two files and trying to generate 1 parquet file.

public class CompactParquetGenerator implements Serializable {

    public void generateParquet(JavaSparkContext sc, String jsonFilePath,
String parquetPath) {
        //int MB_128 = 128*1024*1024;
        //sc.hadoopConfiguration().setInt("dfs.blocksize", MB_128);
        //sc.hadoopConfiguration().setInt("parquet.block.size", MB_128);
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);
        JavaRDD<Claim> claimRdd = sc.textFile(jsonFilePath).map(new
StringToClaimMapper()).filter(new NullFilter());
        JavaSchemaRDD claimSchemaRdd = sqlCtx.applySchema(claimRdd,
Claim.class);
        claimSchemaRdd.coalesce(1)
        claimSchemaRdd.saveAsParquetFile(parquetPath);
    }



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19773.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Control number of parquet generated from JavaSchemaRDD

Posted by Michael Armbrust <mi...@databricks.com>.
repartition and coalesce should both allow you to achieve what you
describe.  Can you maybe share the code that is not working?

On Mon, Nov 24, 2014 at 8:24 PM, tridib <tr...@live.com> wrote:

> Hello,
> I am reading around 1000 input files from disk in an RDD and generating
> parquet. It always produces same number of parquet files as number of input
> files. I tried to merge them using
>
> rdd.coalesce(n) and/or rdd.repatition(n).
> also tried using:
>
>         int MB_128 = 128*1024*1024;
>         sc.hadoopConfiguration().setInt("dfs.blocksize", MB_128);
>         sc.hadoopConfiguration().setInt("parquet.block.size", MB_128);
>
> No luck.
> Is there a way to control the size/number of parquet files generated?
>
> Thanks
> Tridib
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>