You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Piubelli, Manuel " <ma...@citi.com.INVALID> on 2016/09/19 12:11:01 UTC

spark streaming slow checkpointing when calling Rserve


Hello,



I wrote a spark streaming application in Java. It reads stock trades off of a data feed receiver and converts them to Tick objects, and uses a microbatch interval, window interval and sliding interval of 10 seconds. A JavaPairDStream<String, Iterable<Tick>> is created where the key is the stock symbol.

The Tick objects are then stored in a JavaMapWithStateDStream using mapWithState; analytics calculations are performed in the mapWithState callback function using the Ticks as input. Everything works fine until I modified my program to also call Rserve inside the mapWithState callback function in order to perform additional analytics calculations in R.

When I started calling Rserve, every 10th window would take a long time to process; this is the window that also writes to the checkpoint file (I am using Hadoop). Every 10th window takes longer to process than the previous 10th window (window 30 takes longer than window 20 which takes longer than window 10). All of the non-checkpoint windows finish well within 10 seconds, but the checkpoint windows can eventually take minutes to complete, and the other windows queue behind them.

I then tried to set the checkpoint interval on the JavaMapWithStateDStream to 24 hours in order to effectively disable checkpointing (mapWithStateStream.checkpoint(Durations.minutes(1440))). I enabled the workers on the 3 server cluster with enough memory so that they would survive the growing memory usage that would result.

The results that I outputted to the log were unexpected. Previously the JavaPairDStream<String, Iterable<Tick>> was being populated with 5000 keys, and it still was. But, previously 5000 keys were being passed to the mapWithState callback function; now only 200 keys were being passed to it, and I see many stages skipped in the Spark Streaming UI web page. When I run this in single process mode on my MS Windows machine, 5000 keys are still passed to the mapWithState callback function.

Does anyone have any idea of why calling Rserve would cause such a huge increase in checkpointing time, or why calling checkpoint(Durations.minutes(1440)) on the JavaMapWithStateDStream would cause spark to not pass most of the tuples in the JavaPairDStream<String, Iterable<Tick>> to the mapWithState callback function?



Question is also posted on http://stackoverflow.com/questions/39535804/spark-streaming-slow-checkpointing-when-calling-rserve.



Thanks