You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andy Davidson <An...@SantaCruzIntegration.com> on 2016/01/29 22:54:41 UTC

How to use DStream reparation() ?

My Streaming app has a requirement that my output be saved in the smallest
number of file possible such that each file does not exceed a max number of
rows. Based on my experience it appears that each partition will be written
to separate output file.

This was really easy to do in my batch processing using data frames and RDD.
Its easy to call count() and then decide how many partitions I want and
finally call repartition().

I am having heck of time trying to figure out to do the same thing using
spark streaming. 


JavaDStream<Pojo> tidy = Š

JavaDStream<Long> counts = tidy.count();



Bellow is the documentation for count. I do not see how I can use this to
figure out how many partitions I need? Stream does not provide a collect().
foreachRDD() can not return a value. I tried using an accumulator but that
did not work



Any suggestions would be greatly appreciated


http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/str
eaming/api/java/JavaDStream.html
count
JavaDStream 
<http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api
/java/JavaDStream.html> <java.lang.Long> count()
Return a new DStream in which each RDD has a single element generated by
counting each RDD of this DStream.
Returns:(undocumented)




Re: How to use DStream reparation() ?

Posted by Andy Davidson <An...@SantaCruzIntegration.com>.
The following code seems to do what I want. I repartition on RDD not
DStreams. I wonder if this has to do with the way windows work?

   private static void saveTweetsCSV(JavaSparkContext jsc,
JavaDStream<TidyPojo> tidy, String outputURI) {

        tidy.foreachRDD(new VoidFunction2<JavaRDD< TidyPojo >, Time> () {

            private static final long serialVersionUID = 1L;

            // typically we use the CSV file format for data a human needs
to work with

            // We want to repartition the data so that we write the smallest
number

            // of files possible how ever the max number of rows in a given
csv

            // file is small enough for a human to work with easily.

            final long maxNumRowsPerFile = 100;



            @Override

            public void call(JavaRDD<TidyPojo> rdd, Time time) throws
Exception {

                long count = rdd.count();

                //if(!rdd.isEmpty()) {

                if (count > 0) {

                    long numPartisions = count / maxNumRowsPerFile + 1;

                    Long tmp = numPartisions;

                    rdd = rdd.repartition(tmp.intValue());

                    String dirPath = outputURI + "_CSV" + "-" +
time.milliseconds();

                    //
http://spark.apache.org/docs/latest/streaming-programming-guide.html#datafra
me-and-sql-operations

                    // Get the singleton instance of SQLContext

                    SQLContext sqlContext =
SQLContext.getOrCreate(rdd.context());

                   

                    DataFrame df = sqlContext.createDataFrame(rdd,
TidyTwitterMLPojo.class);

                    TidyPojo.saveCSV(df, dirPath);

                }  

            }

        });

    }


From:  Andrew Davidson <An...@SantaCruzIntegration.com>
Date:  Friday, January 29, 2016 at 1:54 PM
To:  "user @spark" <us...@spark.apache.org>
Subject:  How to use DStream<T> reparation() ?

> My Streaming app has a requirement that my output be saved in the smallest
> number of file possible such that each file does not exceed a max number of
> rows. Based on my experience it appears that each partition will be written to
> separate output file.
> 
> This was really easy to do in my batch processing using data frames and RDD.
> Its easy to call count() and then decide how many partitions I want and
> finally call repartition().
> 
> I am having heck of time trying to figure out to do the same thing using spark
> streaming. 
> 
> 
> JavaDStream<Pojo> tidy = Š
> 
> JavaDStream<Long> counts = tidy.count();
> 
> 
> 
> Bellow is the documentation for count. I do not see how I can use this to
> figure out how many partitions I need? Stream does not provide a collect().
> foreachRDD() can not return a value. I tried using an accumulator but that did
> not work
> 
> 
> 
> Any suggestions would be greatly appreciated
> 
> 
> http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/strea
> ming/api/java/JavaDStream.html
> count
> JavaDStream 
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api/j
> ava/JavaDStream.html> <java.lang.Long> count()
> Return a new DStream in which each RDD has a single element generated by
> counting each RDD of this DStream.
> Returns:(undocumented)
>