You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tathagata Das (JIRA)" <ji...@apache.org> on 2015/01/23 07:24:34 UTC

[jira] [Commented] (SPARK-3876) Doing a RDD map/reduce within a DStream map fails with a high enough input rate

    [ https://issues.apache.org/jira/browse/SPARK-3876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14288851#comment-14288851 ] 

Tathagata Das commented on SPARK-3876:
--------------------------------------

This seems to be a fundamentally incorrect computation to do with RDDs. You cannot refer to another RDD in the anotherRDD.map( ). If you have combine information between two RDDs, there are couple of ways of doing it. 

1. Use multiple RDD ops like join, cogroup, cartesian, etc. 
2. Collect and broadcast out the information of one RDD and then use the broadcast variable in the map (see Spark programming guide for broadcast variables). 

So this pattern is incorrect and therefore this JIRA is kinda invalid. I am closing this JIRA.

> Doing a RDD map/reduce within a DStream map fails with a high enough input rate
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-3876
>                 URL: https://issues.apache.org/jira/browse/SPARK-3876
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.2
>            Reporter: Andrei Filip
>
> Having a custom receiver than generates random strings at custom rates: JavaRandomSentenceReceiver
> A class that does work on a received string:
> class LengthGetter implements Serializable{
> 	public int getStrLength(String s){
> 		return s.length();
> 	}
> }
> The following code:
> List<LengthGetter> objList = Arrays.asList(new LengthGetter(), new LengthGetter(), new LengthGetter());
> 		
> 		final JavaRDD<LengthGetter> objRdd = sc.parallelize(objList);
> 		
> 		
> 		JavaInputDStream<String> sentences = jssc.receiverStream(new JavaRandomSentenceReceiver(frequency));
> 		
> 		sentences.map(new Function<String, Integer>() {
> 			@Override
> 			public Integer call(final String input) throws Exception {
> 				Integer res = objRdd.map(new Function<LengthGetter, Integer>() {
> 					@Override
> 					public Integer call(LengthGetter lg) throws Exception {
> 						return lg.getStrLength(input);
> 					}
> 				}).reduce(new Function2<Integer, Integer, Integer>() {
> 					
> 					@Override
> 					public Integer call(Integer left, Integer right) throws Exception {
> 						return left + right;
> 					}
> 				});
> 				
> 				
> 				return res;
> 			}			
> 		}).print();
> fails for high enough frequencies with the following stack trace:
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:0 failed 1 times, most recent failure: Exception failure in TID 3 on host localhost: java.lang.NullPointerException
>         org.apache.spark.rdd.RDD.map(RDD.scala:270)
>         org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:72)
>         org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:29)
> Other information that might be useful is that my current batch duration is set to 1sec and the frequencies for JavaRandomSentenceReceiver at which the application fails are as low as 2Hz (1Hz for example works)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org