You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Brian Cho <ch...@gmail.com> on 2016/05/11 19:01:32 UTC

Adding HDFS read-time metrics per task (RE: SPARK-1683)

Hi,

I'm interested in adding read-time (from HDFS) to Task Metrics. The
motivation is to help debug performance issues. After some digging, its
briefly mentioned in SPARK-1683 that this feature didn't make it due to
metric collection causing a performance regression [1].

I'd like to try tackling this, but would be very grateful if those with
experience can give some more information on what was attempted previously,
and why this didn't work previously. Or if there are philosophical
objections to these metrics. If you feel this is a dead-end please help me
from myself.

Thank you,
Brian

[1] https://github.com/apache/spark/pull/962

Re: Adding HDFS read-time metrics per task (RE: SPARK-1683)

Posted by Brian Cho <ch...@gmail.com>.
Kay -- we would like to add the read metrics (in a compatible way) into our
internal DFS at Facebook and then call that method from Spark. In parallel
if you can finish up HADOOP-11873 :) , then we could add hooks to those
metrics in Spark. What do you think? Does this look like a feasible plan to
getting the metrics in?

Thanks,
Brian

On Thu, May 12, 2016 at 12:12 PM, Steve Loughran <st...@hortonworks.com>
wrote:

>
> On 12 May 2016, at 04:44, Brian Cho <ch...@gmail.com> wrote:
>
> Hi Kay,
>
> Thank you for the detailed explanation.
>
> If I understand correctly, I *could* time each record processing time by
> measuring the time in reader.next, but this would add overhead for every
> single record. And this is the method that was abandoned because of
> performance regressions.
>
> The other possibility is changing HDFS first. This method looks promising
> even if it takes some time. I'll play around with it a bit for now. Thanks
> again!
>
> -Brian
>
> On Wed, May 11, 2016 at 4:45 PM, Kay Ousterhout <ke...@eecs.berkeley.edu>
> wrote:
>
>> Hi Brian,
>>
>> Unfortunately it's not possible to do this in Spark for two reasons.
>> First, we read records from Spark one at a time (e.g., if you're reading a
>> HDFS file and performing some map function, one record will be read from
>> HDFS, then the map function will be applied, then the next record will be
>> read, etc.). The relevant code is here
>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L209>:
>> we create an iterator that's then passed on to other downstream RDDs.  As a
>> result, we'd need to time each record's processing, which adds too much
>> overhead.
>>
>> The other potential issue is that we use the RecordReader interface,
>> which means that we get deserialized and decompressed records, so any time
>> we measured would include time to read the data from disk and
>> decompress/deserialize it (not sure if you're trying to isolate the disk
>> time).
>>
>
> Measuring decompression overhead alone is interesting. Indeed, with
> encryption at rest and erasure coding in hadoop, you'd think about
> isolating work there too, to see where the bottlenecks move to after a
> switch to SSDs.
>
>
>> It *is* possible to do this instrumentation for disk read time in HDFS,
>> because HDFS reads larger blocks from disk (and then passes them to Spark
>> one by one), and I did that (in a hacky way) in the most recent commits
>> in this Hadoop branch
>> <https://github.com/kayousterhout/hadoop-common/commits/2.0.2-instrumented>.
>> I filed a Hadoop JIRA
>> <https://issues.apache.org/jira/browse/HADOOP-11873>to add this (in a
>> less hacky way, using FileSystem.Statistics) but haven't submitted a patch
>> for it.  If there's sufficient interest, I could properly implement the
>> metrics and see if it could be merged into Hadoop, at which point Spark
>> could start reading those metrics (unfortunately, the delay for this would
>> be pretty significant because we'd need to wait for a new Hadoop version
>> and then a new Spark version, and it would only be available in newer
>> versions of Hadoop).
>>
>
> The metrics API changed 19 hours ago into something more sophisticated,
> though it doesn't measure timings.
>
> https://issues.apache.org/jira/browse/HADOOP-13065
>
> it's designed to be more extensible; you'll ask for a metric by name, not
> compile-time field...this will let different filesystems add different
> values
>
> A few minutes ago, https://issues.apache.org/jira/browse/HADOOP-13028 went
> in to do some metric work for spark, and there the stats can be printed in
> logs, because the filesystem and inputStream toString() operators return
> the metrics. That's for people: not machines; the text may break without
> warning. But you can at least dump the metrics in your logs to see what's
> going on. That stuff can be seen in downstream tests, but not directly
> published as metrics. The aggregate stats are also collected as metrics2
> stats, which should somehow be convertible to Coda Hale metrics, and hence
> with the rest of Spark's monitoring.
>
>
> A more straightforward action might just be for spark itself to
> subclass FilterFileSystem and implement operation timing there, both for
> operations and any input/output streams returned in create & open.
>
>

Re: Adding HDFS read-time metrics per task (RE: SPARK-1683)

Posted by Steve Loughran <st...@hortonworks.com>.
On 12 May 2016, at 04:44, Brian Cho <ch...@gmail.com>> wrote:

Hi Kay,

Thank you for the detailed explanation.

If I understand correctly, I *could* time each record processing time by measuring the time in reader.next, but this would add overhead for every single record. And this is the method that was abandoned because of performance regressions.

The other possibility is changing HDFS first. This method looks promising even if it takes some time. I'll play around with it a bit for now. Thanks again!

-Brian

On Wed, May 11, 2016 at 4:45 PM, Kay Ousterhout <ke...@eecs.berkeley.edu>> wrote:
Hi Brian,

Unfortunately it's not possible to do this in Spark for two reasons.  First, we read records from Spark one at a time (e.g., if you're reading a HDFS file and performing some map function, one record will be read from HDFS, then the map function will be applied, then the next record will be read, etc.). The relevant code is here<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L209>: we create an iterator that's then passed on to other downstream RDDs.  As a result, we'd need to time each record's processing, which adds too much overhead.

The other potential issue is that we use the RecordReader interface, which means that we get deserialized and decompressed records, so any time we measured would include time to read the data from disk and decompress/deserialize it (not sure if you're trying to isolate the disk time).

Measuring decompression overhead alone is interesting. Indeed, with encryption at rest and erasure coding in hadoop, you'd think about isolating work there too, to see where the bottlenecks move to after a switch to SSDs.


It *is* possible to do this instrumentation for disk read time in HDFS, because HDFS reads larger blocks from disk (and then passes them to Spark one by one), and I did that (in a hacky way) in the most recent commits in this Hadoop branch<https://github.com/kayousterhout/hadoop-common/commits/2.0.2-instrumented>.  I filed a Hadoop JIRA <https://issues.apache.org/jira/browse/HADOOP-11873> to add this (in a less hacky way, using FileSystem.Statistics) but haven't submitted a patch for it.  If there's sufficient interest, I could properly implement the metrics and see if it could be merged into Hadoop, at which point Spark could start reading those metrics (unfortunately, the delay for this would be pretty significant because we'd need to wait for a new Hadoop version and then a new Spark version, and it would only be available in newer versions of Hadoop).

The metrics API changed 19 hours ago into something more sophisticated, though it doesn't measure timings.

https://issues.apache.org/jira/browse/HADOOP-13065

it's designed to be more extensible; you'll ask for a metric by name, not compile-time field...this will let different filesystems add different values

A few minutes ago, https://issues.apache.org/jira/browse/HADOOP-13028 went in to do some metric work for spark, and there the stats can be printed in logs, because the filesystem and inputStream toString() operators return the metrics. That's for people: not machines; the text may break without warning. But you can at least dump the metrics in your logs to see what's going on. That stuff can be seen in downstream tests, but not directly published as metrics. The aggregate stats are also collected as metrics2 stats, which should somehow be convertible to Coda Hale metrics, and hence with the rest of Spark's monitoring.


A more straightforward action might just be for spark itself to subclass FilterFileSystem and implement operation timing there, both for operations and any input/output streams returned in create & open.


Re: Adding HDFS read-time metrics per task (RE: SPARK-1683)

Posted by Brian Cho <ch...@gmail.com>.
Hi Kay,

Thank you for the detailed explanation.

If I understand correctly, I *could* time each record processing time by
measuring the time in reader.next, but this would add overhead for every
single record. And this is the method that was abandoned because of
performance regressions.

The other possibility is changing HDFS first. This method looks promising
even if it takes some time. I'll play around with it a bit for now. Thanks
again!

-Brian

On Wed, May 11, 2016 at 4:45 PM, Kay Ousterhout <ke...@eecs.berkeley.edu>
wrote:

> Hi Brian,
>
> Unfortunately it's not possible to do this in Spark for two reasons.
> First, we read records from Spark one at a time (e.g., if you're reading a
> HDFS file and performing some map function, one record will be read from
> HDFS, then the map function will be applied, then the next record will be
> read, etc.). The relevant code is here
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L209>:
> we create an iterator that's then passed on to other downstream RDDs.  As a
> result, we'd need to time each record's processing, which adds too much
> overhead.
>
> The other potential issue is that we use the RecordReader interface, which
> means that we get deserialized and decompressed records, so any time we
> measured would include time to read the data from disk and
> decompress/deserialize it (not sure if you're trying to isolate the disk
> time).
>
> It *is* possible to do this instrumentation for disk read time in HDFS,
> because HDFS reads larger blocks from disk (and then passes them to Spark
> one by one), and I did that (in a hacky way) in the most recent commits
> in this Hadoop branch
> <https://github.com/kayousterhout/hadoop-common/commits/2.0.2-instrumented>.
> I filed a Hadoop JIRA <https://issues.apache.org/jira/browse/HADOOP-11873>to
> add this (in a less hacky way, using FileSystem.Statistics) but haven't
> submitted a patch for it.  If there's sufficient interest, I could properly
> implement the metrics and see if it could be merged into Hadoop, at which
> point Spark could start reading those metrics (unfortunately, the delay for
> this would be pretty significant because we'd need to wait for a new Hadoop
> version and then a new Spark version, and it would only be available in
> newer versions of Hadoop).
>
> You may be wondering if it's possible to efficiently sample for this
> metric in Spark.  This won't work for the disk read time, because only a
> small number of Spark's calls to read() take a long time (the ones that
> cause a larger block to be read from disk).
>
> -Kay
>
>
> On Wed, May 11, 2016 at 2:01 PM, Reynold Xin <rx...@databricks.com> wrote:
>
>> Adding Kay
>>
>>
>> On Wed, May 11, 2016 at 12:01 PM, Brian Cho <ch...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm interested in adding read-time (from HDFS) to Task Metrics. The
>>> motivation is to help debug performance issues. After some digging, its
>>> briefly mentioned in SPARK-1683 that this feature didn't make it due to
>>> metric collection causing a performance regression [1].
>>>
>>> I'd like to try tackling this, but would be very grateful if those with
>>> experience can give some more information on what was attempted previously,
>>> and why this didn't work previously. Or if there are philosophical
>>> objections to these metrics. If you feel this is a dead-end please help me
>>> from myself.
>>>
>>> Thank you,
>>> Brian
>>>
>>> [1] https://github.com/apache/spark/pull/962
>>>
>>
>>
>

Re: Adding HDFS read-time metrics per task (RE: SPARK-1683)

Posted by Reynold Xin <rx...@databricks.com>.
Adding Kay


On Wed, May 11, 2016 at 12:01 PM, Brian Cho <ch...@gmail.com> wrote:

> Hi,
>
> I'm interested in adding read-time (from HDFS) to Task Metrics. The
> motivation is to help debug performance issues. After some digging, its
> briefly mentioned in SPARK-1683 that this feature didn't make it due to
> metric collection causing a performance regression [1].
>
> I'd like to try tackling this, but would be very grateful if those with
> experience can give some more information on what was attempted previously,
> and why this didn't work previously. Or if there are philosophical
> objections to these metrics. If you feel this is a dead-end please help me
> from myself.
>
> Thank you,
> Brian
>
> [1] https://github.com/apache/spark/pull/962
>