You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by chris-sw <ch...@semmelwise.nl> on 2018/01/19 07:28:10 UTC

[Spark structured streaming] Use of (flat)mapgroupswithstate takes long time

Hi,

I recently did some experiments with stateful structured streaming by using
flatmapgroupswithstate. The streaming application is quit simple: It
receives data from Kafka, feed it to the stateful operator
(flatmapgroupswithstate) and sinks the output to console.
During a test with small datasets (3-5 records per batch) I experienced long
batch runs.

Taking a look at the log I see an explosion of tasks with these log entries:
-----
2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO
org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID
287)
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
- Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir =
/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty
blocks out of 1 blocks
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote
fetches in 0 ms
2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
- Committed version 2 for
HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85]
to file
/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta
2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID
287). 2212 bytes result sent to driver
-----

A batch run takes approx. 5 seconds and it seems most of the time it is
doing tasks like above.
I created several apps using the non-Spark SQL approach with mapWithState
but never experienced these long batch runs.

Anyone has this experience as well or can help me finding a solution for
these long runs.

Regards,

Chris



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: [Spark structured streaming] Use of (flat)mapgroupswithstate takes long time

Posted by Christiaan Ras <ch...@semmelwise.nl>.
Hi TD,

Thanks for taking the time to review my question.
Answers to your questions:
- How many tasks are being launched in the reduce stage (that is, the stage after the shuffle, that is computing mapGroupsWithState)
In the dashboard I count 200 tasks in the stage containing: Exchange -> WholeStageCodegen -> FlatMapGroupsWithState -> WholeStageCodegen.
I understand that flatmapgroupswithstate uses 200 partitions for storing the state by default. Can I control the partitioning within flatmapgroupswithstate?
Besides that I guess 200 partitions is far too large for the dataset I’m using in my small test, but I cannot imagine a larger dataset would perform better with this/my setup.

- How long each task is taking?
The 75th percentile states 54ms (with a max of 2sec for a single task). Most of the time is ‘Computing Time’ according to the Event Timeline.

- How many cores does the cluster have?
The cluster is small and has 2 workers, both using 1 core. I’m wondering how Spark determines the number of cores when using Docker (a single host with multiple spark containers).

Regards,
Chris


From: Tathagata Das <ta...@gmail.com>
Date: Tuesday, 23 January 2018 at 00:04
To: Christiaan Ras <ch...@semmelwise.nl>
Cc: user <us...@spark.apache.org>
Subject: Re: [Spark structured streaming] Use of (flat)mapgroupswithstate takes long time

For computing mapGroupsWithState, can you check the following.
- How many tasks are being launched in the reduce stage (that is, the stage after the shuffle, that is computing mapGroupsWithState)
- How long each task is taking?
- How many cores does the cluster have?


On Thu, Jan 18, 2018 at 11:28 PM, chris-sw <ch...@semmelwise.nl>> wrote:
Hi,

I recently did some experiments with stateful structured streaming by using
flatmapgroupswithstate. The streaming application is quit simple: It
receives data from Kafka, feed it to the stateful operator
(flatmapgroupswithstate) and sinks the output to console.
During a test with small datasets (3-5 records per batch) I experienced long
batch runs.

Taking a look at the log I see an explosion of tasks with these log entries:
-----
2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO
org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID
287)
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
- Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir =
/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty
blocks out of 1 blocks
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote
fetches in 0 ms
2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
- Committed version 2 for
HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85]
to file
/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta
2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID
287). 2212 bytes result sent to driver
-----

A batch run takes approx. 5 seconds and it seems most of the time it is
doing tasks like above.
I created several apps using the non-Spark SQL approach with mapWithState
but never experienced these long batch runs.

Anyone has this experience as well or can help me finding a solution for
these long runs.

Regards,

Chris



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>


Re: [Spark structured streaming] Use of (flat)mapgroupswithstate takes long time

Posted by Tathagata Das <ta...@gmail.com>.
For computing mapGroupsWithState, can you check the following.
- How many tasks are being launched in the reduce stage (that is, the stage
after the shuffle, that is computing mapGroupsWithState)
- How long each task is taking?
- How many cores does the cluster have?


On Thu, Jan 18, 2018 at 11:28 PM, chris-sw <ch...@semmelwise.nl>
wrote:

> Hi,
>
> I recently did some experiments with stateful structured streaming by using
> flatmapgroupswithstate. The streaming application is quit simple: It
> receives data from Kafka, feed it to the stateful operator
> (flatmapgroupswithstate) and sinks the output to console.
> During a test with small datasets (3-5 records per batch) I experienced
> long
> batch runs.
>
> Taking a look at the log I see an explosion of tasks with these log
> entries:
> -----
> 2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO
> org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID
> 287)
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.sql.execution.streaming.state.
> HDFSBackedStateStoreProvider
> - Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir =
> /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty
> blocks out of 1 blocks
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote
> fetches in 0 ms
> 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
> org.apache.spark.sql.execution.streaming.state.
> HDFSBackedStateStoreProvider
> - Committed version 2 for
> HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary-
> 8b418cec-0378-4324-a916-6e3864500d56/state/0/85]
> to file
> /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta
> 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
> org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID
> 287). 2212 bytes result sent to driver
> -----
>
> A batch run takes approx. 5 seconds and it seems most of the time it is
> doing tasks like above.
> I created several apps using the non-Spark SQL approach with mapWithState
> but never experienced these long batch runs.
>
> Anyone has this experience as well or can help me finding a solution for
> these long runs.
>
> Regards,
>
> Chris
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>