You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by onpoq l <on...@gmail.com> on 2014/06/09 18:48:33 UTC

How to achieve reasonable performance on Spark Streaming?

Dear All,

I recently installed Spark 1.0.0 on a 10-slave dedicate cluster. However,
the max input rate that the system can sustain with stable latency seems
very low. I use a simple word counting workload over tweets:

theDStream.flatMap(extractWordOnePairs).reduceByKey(sumFunc).count.print

With 2s batch interval, the 10-slave cluster can only handle ~ 30,000
tweets/s (which translates to ~ 300,000 words/s). To give you a sense about
the speed of a slave machine,  a single machine can handle ~ 100,000
tweets/s on a stream processing program in plain java.

I've tuned the following parameters without seeing obvious improvement:
1. Batch interval: 1s, 2s, 5s, 10s
2. Parallelism: 1 x total num of cores, 2x, 3x
3. StorageLevel: MEMORY_ONLY, MEMORY_ONLY_SER
4. Run type: yarn-client, standalone cluster

* My first question is: what are the max input rates you have observed on
Spark Streaming? I know it depends on the workload and the hardware. But I
just want to get some sense of the reasonable numbers.

* My second question is: any suggestion on what I can tune to improve the
performance? I've found unexpected delays in "reduce" that I can't explain,
and they may be related to the poor performance. Details are shown below

============= DETAILS =============

Below is the CPU utilization plot with 2s batch interval and 40,000
tweets/s. The latency keeps increasing while the CPU, network, disk and
memory are all under utilized.


I tried to find out which stage is the bottleneck. It seems that the
"reduce" phase for each batch can usually finish in less than 0.5s, but
sometimes (70 out of 545 batches) takes 5s. Below is a snapshot of the wet
UI showing the time taken by "reduce" in some batches where the normal
cases are marked in green and the abnormal case is marked in red:


I further look into all the tasks of a slow "reduce" stage. As shown by the
below snapshot, a small portion of the tasks are stragglers:


Here is the log of some slow "reduce" tasks on an executor, where the start
and end of the tasks are marked in red. They started at 21:55:43, and
completed at 21:55:48. During the 5s, I can only see shuffling at the
beginning and activities of input blocks.


...

For comparison, here is the log of the normal "reduce" tasks on the same
executor:


It seems that these slow "reduce" stages keep latency increasing. However,
I can't explain what causes the straggler tasks in the "reduce" stages.
Anybody has any idea?

Thanks.

Re: How to achieve reasonable performance on Spark Streaming?

Posted by Tathagata Das <ta...@gmail.com>.
Hello all,

Apologies for the late response, this thread went below my radar. There are
a number of things that can be done to improve the performance. Here are
some of them of the top of my head based on what you have mentioned. Most
of them are mentioned in the streaming guide's performance tuning section
<http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch>
.

1. Good input distribution - If the data ingestion can be distributed
across multiple receivers, then it is worth doing so. Partitioning the data
streams and parallelizing the receiving distributed
serialization-deserialization cost of receiving. Note that you can create
multiple input streams (presumably of the same type) and do a union on them
(StreamingContext.union) to create a single DStream out of them.

2. Good task distribution - The tasks, specially the first map stage on the
input stream(s), should be well distributed across the workers in the
cluster. If they are not, then it is sometimes worth repartitioning the
dstream (DStream.repartition) so that computation can be parallelized well.
Can improve performance.

3. Enable Kryo serialization - Set spark conf property "spark.serializer"
[see configuration guide
<http://spark.apache.org/docs/latest/configuration.html>] This can improve
performance all across the board.

4. Enable Concurrent MarkSweep GC - This helps with more stable batch
processing times. You see the gc overheads in stages in the web ui. If they
are in the order of 100s milliseconds, then its worth tuning the GC.

5. Store strings as byte arrays, and do all string transformations as byte
array transformations (that is, implemented extractWordOnePairs on
bytearrays). This can give a very significant boost is throughput as the
java string library is not very efficient. Granted that this requires more
work in manipulating bytes yourself.


Regarding the 5 seconds pauses, the culprit could very well be the disk.
Since shuffle file are written to disk, that often gives rise to
unpredictable delays. In our research paper
<http://www.cs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf> we
had got much higher performance using in-memory shuffling. However, the
current version of Spark does not have in-memory shuffling for a number of
reasons (prevent RDD cache pollution with single-use shuffle data, reduce
chances of OOMs, etc.). It is recommended that you use a fast file system
(SSD?, RAMFS?) at the workers and make sure the local working directory
uses that. If you are interested in hacking in-memory shuffling within
Spark, you can hack around the org.apache.spark.storage.* (especially,
BlockManager) to make sure all shuffle related "blocks" go to MemoryStore
(look out for code deals with shuffle blocks and passes them directly to
DiskBlockManager). We are in the process of building in-memory shuffle in
spark, but its hard to give a ETA on that as of now.

To achieve even higher throughput (specifically the ones reported in the
paper), it require pre-serializing the data into the Spark format (kryo
format, if kryo serialization is used) as large byte arrays, receiving the
byte arrays directly and storing them in the receiver directly (without
deserializing them). This avoids serialization-deserialization costs of
receiving the data and can give higher throughput overall.

Let me know if this helps, or there are more detailed performance tuning
you are interested in.

TD



On Fri, Jun 13, 2014 at 11:22 AM, Michael Chang <mi...@tellapart.com> wrote:

> I'm interested in this issue as well.  I have spark streaming jobs that
> seems to run well for a while, but slowly degrade and don't recover.
>
>
> On Wed, Jun 11, 2014 at 11:08 PM, Boduo Li <on...@gmail.com> wrote:
>
>> It seems that the slow "reduce" tasks are caused by slow shuffling. Here
>> is
>> the logs regarding one slow "reduce" task:
>>
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_88_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_89_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_90_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_91_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_92_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_93_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_94_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_95_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_96_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_97_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_188_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_189_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_190_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_191_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_192_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_193_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_194_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_195_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_196_18 after  5029 ms
>> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
>> remote block shuffle_69_197_18 after  5029 ms
>> 14/06/11 23:42:45 INFO Executor: Serialized size of result for 23643 is
>> 1143
>> 14/06/11 23:42:45 INFO Executor: Sending result for 23643 directly to
>> driver
>> 14/06/11 23:42:45 INFO Executor: Finished task ID 23643
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262p7454.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>

Re: How to achieve reasonable performance on Spark Streaming?

Posted by Michael Chang <mi...@tellapart.com>.
I'm interested in this issue as well.  I have spark streaming jobs that
seems to run well for a while, but slowly degrade and don't recover.


On Wed, Jun 11, 2014 at 11:08 PM, Boduo Li <on...@gmail.com> wrote:

> It seems that the slow "reduce" tasks are caused by slow shuffling. Here is
> the logs regarding one slow "reduce" task:
>
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_88_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_89_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_90_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_91_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_92_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_93_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_94_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_95_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_96_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_97_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_188_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_189_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_190_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_191_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_192_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_193_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_194_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_195_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_196_18 after  5029 ms
> 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
> remote block shuffle_69_197_18 after  5029 ms
> 14/06/11 23:42:45 INFO Executor: Serialized size of result for 23643 is
> 1143
> 14/06/11 23:42:45 INFO Executor: Sending result for 23643 directly to
> driver
> 14/06/11 23:42:45 INFO Executor: Finished task ID 23643
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262p7454.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: How to achieve reasonable performance on Spark Streaming?

Posted by Boduo Li <on...@gmail.com>.
It seems that the slow "reduce" tasks are caused by slow shuffling. Here is
the logs regarding one slow "reduce" task:

14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_88_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_89_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_90_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_91_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_92_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_93_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_94_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_95_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_96_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_97_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_188_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_189_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_190_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_191_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_192_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_193_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_194_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_195_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_196_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_197_18 after  5029 ms
14/06/11 23:42:45 INFO Executor: Serialized size of result for 23643 is 1143
14/06/11 23:42:45 INFO Executor: Sending result for 23643 directly to driver
14/06/11 23:42:45 INFO Executor: Finished task ID 23643




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262p7454.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.