You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gerard Maas <ge...@gmail.com> on 2019/03/25 19:17:27 UTC

Understanding State Store storage behavior for the Stream Deduplication function

Sparkers,

I'm trying to understand the clean-up behavior of the state store for the
case of stream deduplication.

I'm running a simple test with an unbounded stream and running
deduplication on it with and without watermark.
While my expectation is that the version with watermark should show a
bounded disk usage after the watermark time is reached, it turns out that
its disk demand keeps increasing. Even worse, the disk usage is larger than
the unbounded version.

This test ran for 8hrs, taking a sample of disk usage at OS level (du) for
the /state folder created by the query execution.

[image: image.png]
The code for the test: (executed in spark-shell, in local mode)

// with watermark
spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
spark.conf.set("spark.sql.shuffle.partitions", "10")
val stream = spark.readStream.format("rate").load()
val dedup = stream.withWatermark("timestamp", "5
minutes").dropDuplicates("value", "timestamp")
val query = dedup.writeStream.format("json").option("path",
"/tmp/spark/results").start

// without watermark
spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
spark.conf.set("spark.sql.shuffle.partitions", "10")
val stream = spark.readStream.format("rate").load()
val dedup = stream.dropDuplicates("value", "timestamp")
val query = dedup.writeStream.format("json").option("path",
"/tmp/spark/results").start

What could be going on here?

Thanks,  Gerard.

Re: Understanding State Store storage behavior for the Stream Deduplication function

Posted by Arun Mahadevan <ar...@apache.org>.
Check the number of rows in your state (from query.lastProgress) and see if
thats stabilizing after some time. Also look inside the "state" folder of
the checkpoint directory to see if the old delta files are getting cleaned
up (you should see roughly around 100 delta files). If both are happening
the state disk usage should not increase. If not it might be a bug.

Thanks,
Arun



On Mon, 1 Apr 2019 at 09:48, Gerard Maas <ge...@gmail.com> wrote:

> Hi Ryan,
>
> I ran the test overnight for roughly 8hrs. Using the rate source with
> default values and the 'best effort' trigger, you get a batch/second, so it
> ran for ~28k batches.
> As you can see in the charts, the disk storage keeps on increasing.
>
> I had to drop this issue last week as I had some priority tasks to take
> care of. I'm going to run these tests again on a cluster to reproduce on a
> different system and compare results.
>
> regards, Gerard.
>
>
>
> On Mon, Apr 1, 2019 at 6:20 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
> wrote:
>
>> How many batches you ran for your test? Spark keeps 100 latest batches by
>> default. I would expect that the disk usage of distributed file system will
>> be stable after 100 batches as long as the state store size is stable.
>>
>> On Mon, Mar 25, 2019 at 5:00 PM Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Thanks Arun,
>>>
>>> Exploring query progress makes sense to understand the internal
>>> behavior. I'm more interested in resource usage.
>>> We recently faced recurring issues with jobs running out of storage. It
>>> turned out that is was due to the large amount of small files that the
>>> state store produces combined with a slightly increased block size [1].
>>> It was using several Gb of disk space of a PVC mounted on GlusterFS to
>>> keep the state of two counts (good/bad samples)
>>>
>>> In the new world of Kubernetes, where resources are explicitly requested
>>> + assigned, stable storage becomes very important.
>>> (I'm wondering how the state store performs on HDFS and its huge block
>>> sizes)
>>>
>>> Going back to the scenario exposed here, this job will eventually crash
>>> because the disk usage keeps increasing.
>>> In theory, watermarks should prevent that, and I'm wondering why this is
>>> not the case.
>>>
>>> Any ideas? Or should I open a Jira already?
>>>
>>> kr, Gerard.
>>>
>>>
>>>
>>>
>>> On Mon, Mar 25, 2019 at 11:13 PM Arun Mahadevan <ar...@apache.org>
>>> wrote:
>>>
>>>> The disk usage of the state folder might not be the right one to look
>>>> at. The space usage would be dominated by the number of delta files and the
>>>> removes are written out as tombstone records. You might not see the
>>>> difference until the files are compacted.
>>>>
>>>> You should instead look at the state operator metrics in the query
>>>> progress.
>>>>
>>>> E.g. query.lastProgress
>>>> ..
>>>>   "stateOperators" : [ {
>>>>     "numRowsTotal" : ...,
>>>>     "numRowsUpdated" : ..,
>>>>     "memoryUsedBytes" : ...
>>>>   } ],
>>>> ...
>>>>
>>>> Thanks,
>>>> Arun
>>>>
>>>>
>>>> On Mon, 25 Mar 2019 at 12:18, Gerard Maas <ge...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sparkers,
>>>>>
>>>>> I'm trying to understand the clean-up behavior of the state store for
>>>>> the case of stream deduplication.
>>>>>
>>>>> I'm running a simple test with an unbounded stream and running
>>>>> deduplication on it with and without watermark.
>>>>> While my expectation is that the version with watermark should show a
>>>>> bounded disk usage after the watermark time is reached, it turns out that
>>>>> its disk demand keeps increasing. Even worse, the disk usage is larger than
>>>>> the unbounded version.
>>>>>
>>>>> This test ran for 8hrs, taking a sample of disk usage at OS level (du)
>>>>> for the /state folder created by the query execution.
>>>>>
>>>>> [image: image.png]
>>>>> The code for the test: (executed in spark-shell, in local mode)
>>>>>
>>>>> // with watermark
>>>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>>>> val stream = spark.readStream.format("rate").load()
>>>>> val dedup = stream.withWatermark("timestamp", "5
>>>>> minutes").dropDuplicates("value", "timestamp")
>>>>> val query = dedup.writeStream.format("json").option("path",
>>>>> "/tmp/spark/results").start
>>>>>
>>>>> // without watermark
>>>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>>>> val stream = spark.readStream.format("rate").load()
>>>>> val dedup = stream.dropDuplicates("value", "timestamp")
>>>>> val query = dedup.writeStream.format("json").option("path",
>>>>> "/tmp/spark/results").start
>>>>>
>>>>> What could be going on here?
>>>>>
>>>>> Thanks,  Gerard.
>>>>>
>>>>>
>>>> --
>>
>> Best Regards,
>> Ryan
>>
>

Re: Understanding State Store storage behavior for the Stream Deduplication function

Posted by Gerard Maas <ge...@gmail.com>.
Hi Ryan,

I ran the test overnight for roughly 8hrs. Using the rate source with
default values and the 'best effort' trigger, you get a batch/second, so it
ran for ~28k batches.
As you can see in the charts, the disk storage keeps on increasing.

I had to drop this issue last week as I had some priority tasks to take
care of. I'm going to run these tests again on a cluster to reproduce on a
different system and compare results.

regards, Gerard.



On Mon, Apr 1, 2019 at 6:20 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> How many batches you ran for your test? Spark keeps 100 latest batches by
> default. I would expect that the disk usage of distributed file system will
> be stable after 100 batches as long as the state store size is stable.
>
> On Mon, Mar 25, 2019 at 5:00 PM Gerard Maas <ge...@gmail.com> wrote:
>
>> Thanks Arun,
>>
>> Exploring query progress makes sense to understand the internal behavior.
>> I'm more interested in resource usage.
>> We recently faced recurring issues with jobs running out of storage. It
>> turned out that is was due to the large amount of small files that the
>> state store produces combined with a slightly increased block size [1].
>> It was using several Gb of disk space of a PVC mounted on GlusterFS to
>> keep the state of two counts (good/bad samples)
>>
>> In the new world of Kubernetes, where resources are explicitly requested
>> + assigned, stable storage becomes very important.
>> (I'm wondering how the state store performs on HDFS and its huge block
>> sizes)
>>
>> Going back to the scenario exposed here, this job will eventually crash
>> because the disk usage keeps increasing.
>> In theory, watermarks should prevent that, and I'm wondering why this is
>> not the case.
>>
>> Any ideas? Or should I open a Jira already?
>>
>> kr, Gerard.
>>
>>
>>
>>
>> On Mon, Mar 25, 2019 at 11:13 PM Arun Mahadevan <ar...@apache.org> wrote:
>>
>>> The disk usage of the state folder might not be the right one to look
>>> at. The space usage would be dominated by the number of delta files and the
>>> removes are written out as tombstone records. You might not see the
>>> difference until the files are compacted.
>>>
>>> You should instead look at the state operator metrics in the query
>>> progress.
>>>
>>> E.g. query.lastProgress
>>> ..
>>>   "stateOperators" : [ {
>>>     "numRowsTotal" : ...,
>>>     "numRowsUpdated" : ..,
>>>     "memoryUsedBytes" : ...
>>>   } ],
>>> ...
>>>
>>> Thanks,
>>> Arun
>>>
>>>
>>> On Mon, 25 Mar 2019 at 12:18, Gerard Maas <ge...@gmail.com> wrote:
>>>
>>>> Sparkers,
>>>>
>>>> I'm trying to understand the clean-up behavior of the state store for
>>>> the case of stream deduplication.
>>>>
>>>> I'm running a simple test with an unbounded stream and running
>>>> deduplication on it with and without watermark.
>>>> While my expectation is that the version with watermark should show a
>>>> bounded disk usage after the watermark time is reached, it turns out that
>>>> its disk demand keeps increasing. Even worse, the disk usage is larger than
>>>> the unbounded version.
>>>>
>>>> This test ran for 8hrs, taking a sample of disk usage at OS level (du)
>>>> for the /state folder created by the query execution.
>>>>
>>>> [image: image.png]
>>>> The code for the test: (executed in spark-shell, in local mode)
>>>>
>>>> // with watermark
>>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>>> val stream = spark.readStream.format("rate").load()
>>>> val dedup = stream.withWatermark("timestamp", "5
>>>> minutes").dropDuplicates("value", "timestamp")
>>>> val query = dedup.writeStream.format("json").option("path",
>>>> "/tmp/spark/results").start
>>>>
>>>> // without watermark
>>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>>> val stream = spark.readStream.format("rate").load()
>>>> val dedup = stream.dropDuplicates("value", "timestamp")
>>>> val query = dedup.writeStream.format("json").option("path",
>>>> "/tmp/spark/results").start
>>>>
>>>> What could be going on here?
>>>>
>>>> Thanks,  Gerard.
>>>>
>>>>
>>> --
>
> Best Regards,
> Ryan
>

Re: Understanding State Store storage behavior for the Stream Deduplication function

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
How many batches you ran for your test? Spark keeps 100 latest batches by
default. I would expect that the disk usage of distributed file system will
be stable after 100 batches as long as the state store size is stable.

On Mon, Mar 25, 2019 at 5:00 PM Gerard Maas <ge...@gmail.com> wrote:

> Thanks Arun,
>
> Exploring query progress makes sense to understand the internal behavior.
> I'm more interested in resource usage.
> We recently faced recurring issues with jobs running out of storage. It
> turned out that is was due to the large amount of small files that the
> state store produces combined with a slightly increased block size [1].
> It was using several Gb of disk space of a PVC mounted on GlusterFS to
> keep the state of two counts (good/bad samples)
>
> In the new world of Kubernetes, where resources are explicitly requested +
> assigned, stable storage becomes very important.
> (I'm wondering how the state store performs on HDFS and its huge block
> sizes)
>
> Going back to the scenario exposed here, this job will eventually crash
> because the disk usage keeps increasing.
> In theory, watermarks should prevent that, and I'm wondering why this is
> not the case.
>
> Any ideas? Or should I open a Jira already?
>
> kr, Gerard.
>
>
>
>
> On Mon, Mar 25, 2019 at 11:13 PM Arun Mahadevan <ar...@apache.org> wrote:
>
>> The disk usage of the state folder might not be the right one to look at.
>> The space usage would be dominated by the number of delta files and the
>> removes are written out as tombstone records. You might not see the
>> difference until the files are compacted.
>>
>> You should instead look at the state operator metrics in the query
>> progress.
>>
>> E.g. query.lastProgress
>> ..
>>   "stateOperators" : [ {
>>     "numRowsTotal" : ...,
>>     "numRowsUpdated" : ..,
>>     "memoryUsedBytes" : ...
>>   } ],
>> ...
>>
>> Thanks,
>> Arun
>>
>>
>> On Mon, 25 Mar 2019 at 12:18, Gerard Maas <ge...@gmail.com> wrote:
>>
>>> Sparkers,
>>>
>>> I'm trying to understand the clean-up behavior of the state store for
>>> the case of stream deduplication.
>>>
>>> I'm running a simple test with an unbounded stream and running
>>> deduplication on it with and without watermark.
>>> While my expectation is that the version with watermark should show a
>>> bounded disk usage after the watermark time is reached, it turns out that
>>> its disk demand keeps increasing. Even worse, the disk usage is larger than
>>> the unbounded version.
>>>
>>> This test ran for 8hrs, taking a sample of disk usage at OS level (du)
>>> for the /state folder created by the query execution.
>>>
>>> [image: image.png]
>>> The code for the test: (executed in spark-shell, in local mode)
>>>
>>> // with watermark
>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>> val stream = spark.readStream.format("rate").load()
>>> val dedup = stream.withWatermark("timestamp", "5
>>> minutes").dropDuplicates("value", "timestamp")
>>> val query = dedup.writeStream.format("json").option("path",
>>> "/tmp/spark/results").start
>>>
>>> // without watermark
>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>> val stream = spark.readStream.format("rate").load()
>>> val dedup = stream.dropDuplicates("value", "timestamp")
>>> val query = dedup.writeStream.format("json").option("path",
>>> "/tmp/spark/results").start
>>>
>>> What could be going on here?
>>>
>>> Thanks,  Gerard.
>>>
>>>
>> --

Best Regards,
Ryan

Re: Understanding State Store storage behavior for the Stream Deduplication function

Posted by Gerard Maas <ge...@gmail.com>.
Thanks Arun,

Exploring query progress makes sense to understand the internal behavior.
I'm more interested in resource usage.
We recently faced recurring issues with jobs running out of storage. It
turned out that is was due to the large amount of small files that the
state store produces combined with a slightly increased block size [1].
It was using several Gb of disk space of a PVC mounted on GlusterFS to keep
the state of two counts (good/bad samples)

In the new world of Kubernetes, where resources are explicitly requested +
assigned, stable storage becomes very important.
(I'm wondering how the state store performs on HDFS and its huge block
sizes)

Going back to the scenario exposed here, this job will eventually crash
because the disk usage keeps increasing.
In theory, watermarks should prevent that, and I'm wondering why this is
not the case.

Any ideas? Or should I open a Jira already?

kr, Gerard.




On Mon, Mar 25, 2019 at 11:13 PM Arun Mahadevan <ar...@apache.org> wrote:

> The disk usage of the state folder might not be the right one to look at.
> The space usage would be dominated by the number of delta files and the
> removes are written out as tombstone records. You might not see the
> difference until the files are compacted.
>
> You should instead look at the state operator metrics in the query
> progress.
>
> E.g. query.lastProgress
> ..
>   "stateOperators" : [ {
>     "numRowsTotal" : ...,
>     "numRowsUpdated" : ..,
>     "memoryUsedBytes" : ...
>   } ],
> ...
>
> Thanks,
> Arun
>
>
> On Mon, 25 Mar 2019 at 12:18, Gerard Maas <ge...@gmail.com> wrote:
>
>> Sparkers,
>>
>> I'm trying to understand the clean-up behavior of the state store for the
>> case of stream deduplication.
>>
>> I'm running a simple test with an unbounded stream and running
>> deduplication on it with and without watermark.
>> While my expectation is that the version with watermark should show a
>> bounded disk usage after the watermark time is reached, it turns out that
>> its disk demand keeps increasing. Even worse, the disk usage is larger than
>> the unbounded version.
>>
>> This test ran for 8hrs, taking a sample of disk usage at OS level (du)
>> for the /state folder created by the query execution.
>>
>> [image: image.png]
>> The code for the test: (executed in spark-shell, in local mode)
>>
>> // with watermark
>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>> val stream = spark.readStream.format("rate").load()
>> val dedup = stream.withWatermark("timestamp", "5
>> minutes").dropDuplicates("value", "timestamp")
>> val query = dedup.writeStream.format("json").option("path",
>> "/tmp/spark/results").start
>>
>> // without watermark
>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>> val stream = spark.readStream.format("rate").load()
>> val dedup = stream.dropDuplicates("value", "timestamp")
>> val query = dedup.writeStream.format("json").option("path",
>> "/tmp/spark/results").start
>>
>> What could be going on here?
>>
>> Thanks,  Gerard.
>>
>>
>

Re: Understanding State Store storage behavior for the Stream Deduplication function

Posted by Arun Mahadevan <ar...@apache.org>.
The disk usage of the state folder might not be the right one to look at.
The space usage would be dominated by the number of delta files and the
removes are written out as tombstone records. You might not see the
difference until the files are compacted.

You should instead look at the state operator metrics in the query progress.

E.g. query.lastProgress
..
  "stateOperators" : [ {
    "numRowsTotal" : ...,
    "numRowsUpdated" : ..,
    "memoryUsedBytes" : ...
  } ],
...

Thanks,
Arun


On Mon, 25 Mar 2019 at 12:18, Gerard Maas <ge...@gmail.com> wrote:

> Sparkers,
>
> I'm trying to understand the clean-up behavior of the state store for the
> case of stream deduplication.
>
> I'm running a simple test with an unbounded stream and running
> deduplication on it with and without watermark.
> While my expectation is that the version with watermark should show a
> bounded disk usage after the watermark time is reached, it turns out that
> its disk demand keeps increasing. Even worse, the disk usage is larger than
> the unbounded version.
>
> This test ran for 8hrs, taking a sample of disk usage at OS level (du) for
> the /state folder created by the query execution.
>
> [image: image.png]
> The code for the test: (executed in spark-shell, in local mode)
>
> // with watermark
> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
> spark.conf.set("spark.sql.shuffle.partitions", "10")
> val stream = spark.readStream.format("rate").load()
> val dedup = stream.withWatermark("timestamp", "5
> minutes").dropDuplicates("value", "timestamp")
> val query = dedup.writeStream.format("json").option("path",
> "/tmp/spark/results").start
>
> // without watermark
> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
> spark.conf.set("spark.sql.shuffle.partitions", "10")
> val stream = spark.readStream.format("rate").load()
> val dedup = stream.dropDuplicates("value", "timestamp")
> val query = dedup.writeStream.format("json").option("path",
> "/tmp/spark/results").start
>
> What could be going on here?
>
> Thanks,  Gerard.
>
>