You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ted Yu <yu...@gmail.com> on 2014/07/26 16:42:15 UTC

setting inputMetrics in HadoopRDD#compute()

Hi,
Starting at line 203:
      try {
        /* bytesRead may not exactly equal the bytes read by a task: split
boundaries aren't
         * always at record boundaries, so tasks may need to read into
other splits to complete
         * a record. */
        inputMetrics.bytesRead = split.inputSplit.value.getLength()
      } catch {
        case e: java.io.IOException =>
          logWarning("Unable to get input size to set InputMetrics for
task", e)
      }
      context.taskMetrics.inputMetrics = Some(inputMetrics)

If there is IOException, context.taskMetrics.inputMetrics is set by
wrapping inputMetrics - as if there wasn't any error.

I wonder if the above code should distinguish the error condition.

Cheers

Re: setting inputMetrics in HadoopRDD#compute()

Posted by Kay Ousterhout <ka...@gmail.com>.
Reynold you're totally right, as discussed offline -- I didn't think about
the limit use case when I wrote this.  Sandy, is it easy to fix this as
part of your patch to use StatisticsData?  If not, I can fix it in a
separate patch.


On Sat, Jul 26, 2014 at 12:12 PM, Reynold Xin <rx...@databricks.com> wrote:

> That makes sense, Sandy.
>
> When you add the patch, can you make sure you comment inline on why the
> fallback is needed?
>
>
>
> On Sat, Jul 26, 2014 at 11:46 AM, Sandy Ryza <sa...@cloudera.com>
> wrote:
>
>> I'm working on a patch that switches this stuff out with the Hadoop
>> FileSystem StatisticsData, which will both give an accurate count and
>> allow
>> us to get metrics while the task is in progress.  A hitch is that it
>> relies
>> on https://issues.apache.org/jira/browse/HADOOP-10688, so we still might
>> want a fallback for versions of Hadoop that don't have this API.
>>
>>
>> On Sat, Jul 26, 2014 at 10:47 AM, Reynold Xin <rx...@databricks.com>
>> wrote:
>>
>> > There is one piece of information that'd be useful to know, which is the
>> > source of the input. Even in the presence of an IOException, the input
>> > metrics still specifies the task is reading from Hadoop.
>> >
>> > However, I'm slightly confused by this -- I think usually we'd want to
>> > report the number of bytes read, rather than the total input size. For
>> > example, if there is a limit (only read the first 5 records), the actual
>> > number of bytes read is much smaller than the total split size.
>> >
>> > Kay, am I mis-interpreting this?
>> >
>> >
>> >
>> > On Sat, Jul 26, 2014 at 7:42 AM, Ted Yu <yu...@gmail.com> wrote:
>> >
>> > > Hi,
>> > > Starting at line 203:
>> > >       try {
>> > >         /* bytesRead may not exactly equal the bytes read by a task:
>> > split
>> > > boundaries aren't
>> > >          * always at record boundaries, so tasks may need to read into
>> > > other splits to complete
>> > >          * a record. */
>> > >         inputMetrics.bytesRead = split.inputSplit.value.getLength()
>> > >       } catch {
>> > >         case e: java.io.IOException =>
>> > >           logWarning("Unable to get input size to set InputMetrics for
>> > > task", e)
>> > >       }
>> > >       context.taskMetrics.inputMetrics = Some(inputMetrics)
>> > >
>> > > If there is IOException, context.taskMetrics.inputMetrics is set by
>> > > wrapping inputMetrics - as if there wasn't any error.
>> > >
>> > > I wonder if the above code should distinguish the error condition.
>> > >
>> > > Cheers
>> > >
>> >
>>
>
>

Re: setting inputMetrics in HadoopRDD#compute()

Posted by Reynold Xin <rx...@databricks.com>.
That makes sense, Sandy.

When you add the patch, can you make sure you comment inline on why the
fallback is needed?



On Sat, Jul 26, 2014 at 11:46 AM, Sandy Ryza <sa...@cloudera.com>
wrote:

> I'm working on a patch that switches this stuff out with the Hadoop
> FileSystem StatisticsData, which will both give an accurate count and allow
> us to get metrics while the task is in progress.  A hitch is that it relies
> on https://issues.apache.org/jira/browse/HADOOP-10688, so we still might
> want a fallback for versions of Hadoop that don't have this API.
>
>
> On Sat, Jul 26, 2014 at 10:47 AM, Reynold Xin <rx...@databricks.com> wrote:
>
> > There is one piece of information that'd be useful to know, which is the
> > source of the input. Even in the presence of an IOException, the input
> > metrics still specifies the task is reading from Hadoop.
> >
> > However, I'm slightly confused by this -- I think usually we'd want to
> > report the number of bytes read, rather than the total input size. For
> > example, if there is a limit (only read the first 5 records), the actual
> > number of bytes read is much smaller than the total split size.
> >
> > Kay, am I mis-interpreting this?
> >
> >
> >
> > On Sat, Jul 26, 2014 at 7:42 AM, Ted Yu <yu...@gmail.com> wrote:
> >
> > > Hi,
> > > Starting at line 203:
> > >       try {
> > >         /* bytesRead may not exactly equal the bytes read by a task:
> > split
> > > boundaries aren't
> > >          * always at record boundaries, so tasks may need to read into
> > > other splits to complete
> > >          * a record. */
> > >         inputMetrics.bytesRead = split.inputSplit.value.getLength()
> > >       } catch {
> > >         case e: java.io.IOException =>
> > >           logWarning("Unable to get input size to set InputMetrics for
> > > task", e)
> > >       }
> > >       context.taskMetrics.inputMetrics = Some(inputMetrics)
> > >
> > > If there is IOException, context.taskMetrics.inputMetrics is set by
> > > wrapping inputMetrics - as if there wasn't any error.
> > >
> > > I wonder if the above code should distinguish the error condition.
> > >
> > > Cheers
> > >
> >
>

Re: setting inputMetrics in HadoopRDD#compute()

Posted by Sandy Ryza <sa...@cloudera.com>.
I'm working on a patch that switches this stuff out with the Hadoop
FileSystem StatisticsData, which will both give an accurate count and allow
us to get metrics while the task is in progress.  A hitch is that it relies
on https://issues.apache.org/jira/browse/HADOOP-10688, so we still might
want a fallback for versions of Hadoop that don't have this API.


On Sat, Jul 26, 2014 at 10:47 AM, Reynold Xin <rx...@databricks.com> wrote:

> There is one piece of information that'd be useful to know, which is the
> source of the input. Even in the presence of an IOException, the input
> metrics still specifies the task is reading from Hadoop.
>
> However, I'm slightly confused by this -- I think usually we'd want to
> report the number of bytes read, rather than the total input size. For
> example, if there is a limit (only read the first 5 records), the actual
> number of bytes read is much smaller than the total split size.
>
> Kay, am I mis-interpreting this?
>
>
>
> On Sat, Jul 26, 2014 at 7:42 AM, Ted Yu <yu...@gmail.com> wrote:
>
> > Hi,
> > Starting at line 203:
> >       try {
> >         /* bytesRead may not exactly equal the bytes read by a task:
> split
> > boundaries aren't
> >          * always at record boundaries, so tasks may need to read into
> > other splits to complete
> >          * a record. */
> >         inputMetrics.bytesRead = split.inputSplit.value.getLength()
> >       } catch {
> >         case e: java.io.IOException =>
> >           logWarning("Unable to get input size to set InputMetrics for
> > task", e)
> >       }
> >       context.taskMetrics.inputMetrics = Some(inputMetrics)
> >
> > If there is IOException, context.taskMetrics.inputMetrics is set by
> > wrapping inputMetrics - as if there wasn't any error.
> >
> > I wonder if the above code should distinguish the error condition.
> >
> > Cheers
> >
>

Re: setting inputMetrics in HadoopRDD#compute()

Posted by Reynold Xin <rx...@databricks.com>.
There is one piece of information that'd be useful to know, which is the
source of the input. Even in the presence of an IOException, the input
metrics still specifies the task is reading from Hadoop.

However, I'm slightly confused by this -- I think usually we'd want to
report the number of bytes read, rather than the total input size. For
example, if there is a limit (only read the first 5 records), the actual
number of bytes read is much smaller than the total split size.

Kay, am I mis-interpreting this?



On Sat, Jul 26, 2014 at 7:42 AM, Ted Yu <yu...@gmail.com> wrote:

> Hi,
> Starting at line 203:
>       try {
>         /* bytesRead may not exactly equal the bytes read by a task: split
> boundaries aren't
>          * always at record boundaries, so tasks may need to read into
> other splits to complete
>          * a record. */
>         inputMetrics.bytesRead = split.inputSplit.value.getLength()
>       } catch {
>         case e: java.io.IOException =>
>           logWarning("Unable to get input size to set InputMetrics for
> task", e)
>       }
>       context.taskMetrics.inputMetrics = Some(inputMetrics)
>
> If there is IOException, context.taskMetrics.inputMetrics is set by
> wrapping inputMetrics - as if there wasn't any error.
>
> I wonder if the above code should distinguish the error condition.
>
> Cheers
>