You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by unk1102 <um...@gmail.com> on 2015/07/14 17:41:50 UTC
How to maintain multiple JavaRDD created within another method like
javaStreamRDD.forEachRDD
I use Spark Streaming where messages read from Kafka topics are stored into
JavaDStream<String> this rdd contains actual data. Now after going through
documentation and other help I have found we traverse JavaDStream using
foreachRDD
javaDStreamRdd.foreachRDD(new Function<JavaRDD<String>,Void>() {
public void call(JavaRDD<String> rdd) {
//now I want to call mapPartitions on above rdd and generate new
JavaRDD<MyTable>
JavaRDD<MyTable> rdd_records = rdd.mapPartitions(
new FlatMapFunction<Iterator<String>, MyTable>() {
public Iterable<MyTable> call(Iterator<String> stringIterator)
throws Exception {
//create List<MyTable> execute the following in while loop
String[] fields = line.split(",");
Record record = create Record from above fields
MyTable table = new MyTable();
return table.append(record);
}
});
}
return null;
}
});
Now my question how does above code work. I want to create JavaRDD<MyTable>
for each RDD of JavaDStream. How do I make sure above code will work fine
with all data and JavaRDD<MyTable> will contain all the data and wont lose
any previous data because of local JavaRDD<MyTable>.
It is like calling lambda function within lambda function. How do I make
sure local variable JavaRDD will point to contain all RDD?
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-maintain-multiple-JavaRDD-created-within-another-method-like-javaStreamRDD-forEachRDD-tp23832.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: How to maintain multiple JavaRDD created within another method like javaStreamRDD.forEachRDD
Posted by Jong Wook Kim <jo...@nyu.edu>.
Your question is not very clear, but from what I understand, you want to deal with a stream of MyTable that has parsed records from your Kafka topics.
What you need is JavaDStream<MyTable>, and you can use transform() <http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api/java/JavaDStreamLike.html#transform(org.apache.spark.api.java.function.Function)> to make one.
It accepts a function that accepts an RDD and returns an RDD, as opposed to foreachRDD, whose argument returns Void as in your code.
PS. I wouldn't name a JavaDStream "javaDStreamRdd", first of all it is not an RDD, and it should be more specific about what it contains.
Jong Wook.
> On Jul 15, 2015, at 00:41, unk1102 <um...@gmail.com> wrote:
>
> I use Spark Streaming where messages read from Kafka topics are stored into
> JavaDStream<String> this rdd contains actual data. Now after going through
> documentation and other help I have found we traverse JavaDStream using
> foreachRDD
>
> javaDStreamRdd.foreachRDD(new Function<JavaRDD<String>,Void>() {
> public void call(JavaRDD<String> rdd) {
> //now I want to call mapPartitions on above rdd and generate new
> JavaRDD<MyTable>
> JavaRDD<MyTable> rdd_records = rdd.mapPartitions(
> new FlatMapFunction<Iterator<String>, MyTable>() {
> public Iterable<MyTable> call(Iterator<String> stringIterator)
> throws Exception {
> //create List<MyTable> execute the following in while loop
> String[] fields = line.split(",");
> Record record = create Record from above fields
> MyTable table = new MyTable();
> return table.append(record);
> }
> });
> }
> return null;
> }
> });
>
> Now my question how does above code work. I want to create JavaRDD<MyTable>
> for each RDD of JavaDStream. How do I make sure above code will work fine
> with all data and JavaRDD<MyTable> will contain all the data and wont lose
> any previous data because of local JavaRDD<MyTable>.
>
> It is like calling lambda function within lambda function. How do I make
> sure local variable JavaRDD will point to contain all RDD?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-maintain-multiple-JavaRDD-created-within-another-method-like-javaStreamRDD-forEachRDD-tp23832.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>