You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jeff Nadler <jn...@srcginc.com> on 2015/10/05 23:28:32 UTC

Streaming Performance w/ UpdateStateByKey

While investigating performance challenges in a Streaming application using
UpdateStateByKey, I found that serialization of state was a meaningful (not
dominant) portion of our execution time.

In StateDStream.scala, serialized persistence is required:

     super.persist(StorageLevel.MEMORY_ONLY_SER)

I can see why that might be a good choice for a default.    For our
workload, I made a clone that uses StorageLevel.MEMORY_ONLY.   I've just
completed some tests and it is indeed faster, with the expected cost of
greater memory usage.   For us that would be a good tradeoff.

I'm not taking any particular extra risks by doing this, am I?

Should this be configurable?  Perhaps yet another signature for
PairDStreamFunctions.updateStateByKey?

Thanks for sharing any thoughts-

Jef

Re: Streaming Performance w/ UpdateStateByKey

Posted by Adrian Tanase <at...@adobe.com>.
How are you determining how much time is serialization taking?

I made this change in a streaming app that relies heavily on updateStateByKey. The memory consumption went up 3x on the executors but I can't see any perf improvement. Task execution time is the same and the serialization state metric in the spark UI is 0-1ms in both scenarios.

Any idea where else to look or why am I not seeing any performance uplift?

Thanks!
Adrian

Sent from my iPhone

On 06 Oct 2015, at 00:47, Tathagata Das <td...@databricks.com>> wrote:

You could call DStream.persist(StorageLevel.MEMORY_ONLY) on the stateDStream returned by updateStateByKey to achieve the same. As you have seen, the downside is greater memory usage, and also higher GC overheads (that;s the main one usually). So I suggest you run your benchmarks for a long enough time to see what is the GC overheads. If it turns out that some batches are randomly taking longer because of some task in some executor being stuck in GC, then its going to be bad.

Alternatively, you could also starting playing with CMS GC, etc.

BTW, it would be amazing, if you can share the number in your benchmarks. Number of states, how complex are the objects in state, whats the processing time and whats the improvements.

TD


On Mon, Oct 5, 2015 at 2:28 PM, Jeff Nadler <jn...@srcginc.com>> wrote:

While investigating performance challenges in a Streaming application using UpdateStateByKey, I found that serialization of state was a meaningful (not dominant) portion of our execution time.

In StateDStream.scala, serialized persistence is required:

     super.persist(StorageLevel.MEMORY_ONLY_SER)

I can see why that might be a good choice for a default.    For our workload, I made a clone that uses StorageLevel.MEMORY_ONLY.   I've just completed some tests and it is indeed faster, with the expected cost of greater memory usage.   For us that would be a good tradeoff.

I'm not taking any particular extra risks by doing this, am I?

Should this be configurable?  Perhaps yet another signature for PairDStreamFunctions.updateStateByKey?

Thanks for sharing any thoughts-

Jef





Re: Streaming Performance w/ UpdateStateByKey

Posted by Tathagata Das <td...@databricks.com>.
You could call DStream.persist(StorageLevel.MEMORY_ONLY) on the
stateDStream returned by updateStateByKey to achieve the same. As you have
seen, the downside is greater memory usage, and also higher GC overheads
(that;s the main one usually). So I suggest you run your benchmarks for a
long enough time to see what is the GC overheads. If it turns out that some
batches are randomly taking longer because of some task in some executor
being stuck in GC, then its going to be bad.

Alternatively, you could also starting playing with CMS GC, etc.

BTW, it would be amazing, if you can share the number in your benchmarks.
Number of states, how complex are the objects in state, whats the
processing time and whats the improvements.

TD


On Mon, Oct 5, 2015 at 2:28 PM, Jeff Nadler <jn...@srcginc.com> wrote:

>
> While investigating performance challenges in a Streaming application
> using UpdateStateByKey, I found that serialization of state was a
> meaningful (not dominant) portion of our execution time.
>
> In StateDStream.scala, serialized persistence is required:
>
>      super.persist(StorageLevel.MEMORY_ONLY_SER)
>
> I can see why that might be a good choice for a default.    For our
> workload, I made a clone that uses StorageLevel.MEMORY_ONLY.   I've just
> completed some tests and it is indeed faster, with the expected cost of
> greater memory usage.   For us that would be a good tradeoff.
>
> I'm not taking any particular extra risks by doing this, am I?
>
> Should this be configurable?  Perhaps yet another signature for
> PairDStreamFunctions.updateStateByKey?
>
> Thanks for sharing any thoughts-
>
> Jef
>
>
>
>