You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by unk1102 <um...@gmail.com> on 2015/07/14 17:41:50 UTC

How to maintain multiple JavaRDD created within another method like javaStreamRDD.forEachRDD

I use Spark Streaming where messages read from Kafka topics are stored into
JavaDStream<String> this rdd contains actual data. Now after going through
documentation and other help I have found we traverse JavaDStream using
foreachRDD

javaDStreamRdd.foreachRDD(new Function<JavaRDD&lt;String>,Void>() {
    public void call(JavaRDD<String> rdd) {
    //now I want to call mapPartitions on above rdd and generate new
JavaRDD<MyTable>
    JavaRDD<MyTable> rdd_records = rdd.mapPartitions(
      new FlatMapFunction<Iterator&lt;String>, MyTable>() {
          public Iterable<MyTable> call(Iterator<String> stringIterator)
throws Exception {
             //create List<MyTable> execute the following in while loop
             String[] fields = line.split(",");
             Record record = create Record from above fields 
             MyTable table = new MyTable();
             return table.append(record);
            }
         });
    }
    return null;
    }
});

Now my question how does above code work. I want to create JavaRDD<MyTable>
for each RDD of JavaDStream. How do I make sure above code will work fine
with all data and JavaRDD<MyTable> will contain all the data and wont lose
any previous data because of local JavaRDD<MyTable>.

It is like calling lambda function within lambda function. How do I make
sure local variable JavaRDD will point to contain all RDD?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-maintain-multiple-JavaRDD-created-within-another-method-like-javaStreamRDD-forEachRDD-tp23832.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to maintain multiple JavaRDD created within another method like javaStreamRDD.forEachRDD

Posted by Jong Wook Kim <jo...@nyu.edu>.
Your question is not very clear, but from what I understand, you want to deal with a stream of MyTable that has parsed records from your Kafka topics.

What you need is JavaDStream<MyTable>, and you can use transform() <http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api/java/JavaDStreamLike.html#transform(org.apache.spark.api.java.function.Function)> to make one.

It accepts a function that accepts an RDD and returns an RDD, as opposed to foreachRDD, whose argument returns Void as in your code.

PS. I wouldn't name a JavaDStream "javaDStreamRdd", first of all it is not an RDD, and it should be more specific about what it contains.


Jong Wook.


> On Jul 15, 2015, at 00:41, unk1102 <um...@gmail.com> wrote:
> 
> I use Spark Streaming where messages read from Kafka topics are stored into
> JavaDStream<String> this rdd contains actual data. Now after going through
> documentation and other help I have found we traverse JavaDStream using
> foreachRDD
> 
> javaDStreamRdd.foreachRDD(new Function<JavaRDD&lt;String>,Void>() {
>    public void call(JavaRDD<String> rdd) {
>    //now I want to call mapPartitions on above rdd and generate new
> JavaRDD<MyTable>
>    JavaRDD<MyTable> rdd_records = rdd.mapPartitions(
>      new FlatMapFunction<Iterator&lt;String>, MyTable>() {
>          public Iterable<MyTable> call(Iterator<String> stringIterator)
> throws Exception {
>             //create List<MyTable> execute the following in while loop
>             String[] fields = line.split(",");
>             Record record = create Record from above fields 
>             MyTable table = new MyTable();
>             return table.append(record);
>            }
>         });
>    }
>    return null;
>    }
> });
> 
> Now my question how does above code work. I want to create JavaRDD<MyTable>
> for each RDD of JavaDStream. How do I make sure above code will work fine
> with all data and JavaRDD<MyTable> will contain all the data and wont lose
> any previous data because of local JavaRDD<MyTable>.
> 
> It is like calling lambda function within lambda function. How do I make
> sure local variable JavaRDD will point to contain all RDD?
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-maintain-multiple-JavaRDD-created-within-another-method-like-javaStreamRDD-forEachRDD-tp23832.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>