You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Mike Hynes <91...@gmail.com> on 2015/06/05 18:51:33 UTC

Scheduler question: stages with non-arithmetic numbering

Hi folks,

When I look at the output logs for an iterative Spark program, I see
that the stage IDs are not arithmetically numbered---that is, there
are gaps between stages and I might find log information about Stage
0, 1,2, 5, but not 3 or 4.

As an example, the output from the Spark logs below shows what I mean:

# grep -rE "Stage [[:digit:]]+" spark_stderr  | grep finished
12048:INFO:DAGScheduler:Stage 0 (mapPartitions at blockMap.scala:1444)
finished in 7.820 s:
15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) finished
in 3.874 s:
18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179)
finished in 2.237 s:
20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) finished
in 1.749 s:
21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180)
finished in 1.082 s:
23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) finished
in 2.078 s:
24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188)
finished in 1.317 s:
26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817) finished
in 1.638 s:
27228:INFO:DAGScheduler:Stage 11 (count at blockMap.scala:1189)
finished in 0.732 s:
27494:INFO:DAGScheduler:Stage 14 (foreach at blockMap.scala:1302)
finished in 0.192 s:
27709:INFO:DAGScheduler:Stage 17 (foreach at blockMap.scala:1302)
finished in 0.170 s:
28018:INFO:DAGScheduler:Stage 20 (count at blockMap.scala:1201)
finished in 0.270 s:
28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355) finished
in 0.455 s:
29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274)
finished in 0.928 s:
29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355) finished
in 0.305 s:
30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275)
finished in 0.391 s:
30452:INFO:DAGScheduler:Stage 32 (first at
MatrixFactorizationModel.scala:60) finished in 0.028 s:
30506:INFO:DAGScheduler:Stage 36 (first at
MatrixFactorizationModel.scala:60) finished in 0.023 s:

Can anyone comment on this being normal behavior? Is it indicative of
faults causing stages to be resubmitted? I also cannot find the
missing stages in any stage's parent List(Stage x, Stage y, ...)

Thanks,
Mike


On 6/1/15, Reynold Xin <rx...@databricks.com> wrote:
> Thanks, René. I actually added a warning to the new JDBC reader/writer
> interface for 1.4.0.
>
> Even with that, I think we should support throttling JDBC; otherwise it's
> too convenient for our users to DOS their production database servers!
>
>
>   /**
>    * Construct a [[DataFrame]] representing the database table accessible
> via JDBC URL
>    * url named table. Partitions of the table will be retrieved in parallel
> based on the parameters
>    * passed to this function.
>    *
> *   * Don't create too many partitions in parallel on a large cluster;
> otherwise Spark might crash*
> *   * your external database systems.*
>    *
>    * @param url JDBC database url of the form `jdbc:subprotocol:subname`
>    * @param table Name of the table in the external database.
>    * @param columnName the name of a column of integral type that will be
> used for partitioning.
>    * @param lowerBound the minimum value of `columnName` used to decide
> partition stride
>    * @param upperBound the maximum value of `columnName` used to decide
> partition stride
>    * @param numPartitions the number of partitions.  the range
> `minValue`-`maxValue` will be split
>    *                      evenly into this many partitions
>    * @param connectionProperties JDBC database connection arguments, a list
> of arbitrary string
>    *                             tag/value. Normally at least a "user" and
> "password" property
>    *                             should be included.
>    *
>    * @since 1.4.0
>    */
>
>
> On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rt...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
>> 1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" is an
>> Array[String] of 32 to 48 elements).  (The code is tailored to your db,
>> specifically through the where conditions, I'd have otherwise post it)
>> That should be the DataFrame API, but I'm just trying to load everything
>> and discard it as soon as possible :-)
>>
>> (1) Never do a silent drop of the values by default: it kills confidence.
>> An option sounds reasonable.  Some sort of insight / log would be great.
>> (How many columns of what type were truncated? why?)
>> Note that I could declare the field as string via JdbcDialects (thank you
>> guys for merging that :-) ).
>> I have quite bad experiences with silent drops / truncates of columns and
>> thus _like_ the strict way of spark. It causes trouble but noticing later
>> that your data was corrupted during conversion is even worse.
>>
>> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004
>>
>> (3) One option would be to make it safe to use, the other option would be
>> to document the behavior (s.th. like "WARNING: this method tries to load
>> as many partitions as possible, make sure your database can handle the
>> load
>> or load them in chunks and use union"). SPARK-8008
>> https://issues.apache.org/jira/browse/SPARK-8008
>>
>> Regards,
>>   Rene Treffer
>>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Scheduler question: stages with non-arithmetic numbering

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Mike,

Stage ID's are not guaranteed to be sequential because of the way the
DAG scheduler works (only increasing). In some cases stage ID numbers
are skipped when stages are generated.

Any stage/ID that appears in the Spark UI is an actual stage, so if
you see ID's in there, but they are not in the logs, then let us know
(that would be a bug).

- Patrick

On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote:
> Are you seeing the same behavior on the driver UI? (that running on port
> 4040), If you click on the stage id header you can sort the stages based on
> IDs.
>
> Thanks
> Best Regards
>
> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91...@gmail.com> wrote:
>>
>> Hi folks,
>>
>> When I look at the output logs for an iterative Spark program, I see
>> that the stage IDs are not arithmetically numbered---that is, there
>> are gaps between stages and I might find log information about Stage
>> 0, 1,2, 5, but not 3 or 4.
>>
>> As an example, the output from the Spark logs below shows what I mean:
>>
>> # grep -rE "Stage [[:digit:]]+" spark_stderr  | grep finished
>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at blockMap.scala:1444)
>> finished in 7.820 s:
>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) finished
>> in 3.874 s:
>> 18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179)
>> finished in 2.237 s:
>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) finished
>> in 1.749 s:
>> 21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180)
>> finished in 1.082 s:
>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) finished
>> in 2.078 s:
>> 24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188)
>> finished in 1.317 s:
>> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817) finished
>> in 1.638 s:
>> 27228:INFO:DAGScheduler:Stage 11 (count at blockMap.scala:1189)
>> finished in 0.732 s:
>> 27494:INFO:DAGScheduler:Stage 14 (foreach at blockMap.scala:1302)
>> finished in 0.192 s:
>> 27709:INFO:DAGScheduler:Stage 17 (foreach at blockMap.scala:1302)
>> finished in 0.170 s:
>> 28018:INFO:DAGScheduler:Stage 20 (count at blockMap.scala:1201)
>> finished in 0.270 s:
>> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355) finished
>> in 0.455 s:
>> 29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274)
>> finished in 0.928 s:
>> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355) finished
>> in 0.305 s:
>> 30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275)
>> finished in 0.391 s:
>> 30452:INFO:DAGScheduler:Stage 32 (first at
>> MatrixFactorizationModel.scala:60) finished in 0.028 s:
>> 30506:INFO:DAGScheduler:Stage 36 (first at
>> MatrixFactorizationModel.scala:60) finished in 0.023 s:
>>
>> Can anyone comment on this being normal behavior? Is it indicative of
>> faults causing stages to be resubmitted? I also cannot find the
>> missing stages in any stage's parent List(Stage x, Stage y, ...)
>>
>> Thanks,
>> Mike
>>
>>
>> On 6/1/15, Reynold Xin <rx...@databricks.com> wrote:
>> > Thanks, René. I actually added a warning to the new JDBC reader/writer
>> > interface for 1.4.0.
>> >
>> > Even with that, I think we should support throttling JDBC; otherwise
>> > it's
>> > too convenient for our users to DOS their production database servers!
>> >
>> >
>> >   /**
>> >    * Construct a [[DataFrame]] representing the database table
>> > accessible
>> > via JDBC URL
>> >    * url named table. Partitions of the table will be retrieved in
>> > parallel
>> > based on the parameters
>> >    * passed to this function.
>> >    *
>> > *   * Don't create too many partitions in parallel on a large cluster;
>> > otherwise Spark might crash*
>> > *   * your external database systems.*
>> >    *
>> >    * @param url JDBC database url of the form `jdbc:subprotocol:subname`
>> >    * @param table Name of the table in the external database.
>> >    * @param columnName the name of a column of integral type that will
>> > be
>> > used for partitioning.
>> >    * @param lowerBound the minimum value of `columnName` used to decide
>> > partition stride
>> >    * @param upperBound the maximum value of `columnName` used to decide
>> > partition stride
>> >    * @param numPartitions the number of partitions.  the range
>> > `minValue`-`maxValue` will be split
>> >    *                      evenly into this many partitions
>> >    * @param connectionProperties JDBC database connection arguments, a
>> > list
>> > of arbitrary string
>> >    *                             tag/value. Normally at least a "user"
>> > and
>> > "password" property
>> >    *                             should be included.
>> >    *
>> >    * @since 1.4.0
>> >    */
>> >
>> >
>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rt...@gmail.com> wrote:
>> >
>> >> Hi,
>> >>
>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" is an
>> >> Array[String] of 32 to 48 elements).  (The code is tailored to your db,
>> >> specifically through the where conditions, I'd have otherwise post it)
>> >> That should be the DataFrame API, but I'm just trying to load
>> >> everything
>> >> and discard it as soon as possible :-)
>> >>
>> >> (1) Never do a silent drop of the values by default: it kills
>> >> confidence.
>> >> An option sounds reasonable.  Some sort of insight / log would be
>> >> great.
>> >> (How many columns of what type were truncated? why?)
>> >> Note that I could declare the field as string via JdbcDialects (thank
>> >> you
>> >> guys for merging that :-) ).
>> >> I have quite bad experiences with silent drops / truncates of columns
>> >> and
>> >> thus _like_ the strict way of spark. It causes trouble but noticing
>> >> later
>> >> that your data was corrupted during conversion is even worse.
>> >>
>> >> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004
>> >>
>> >> (3) One option would be to make it safe to use, the other option would
>> >> be
>> >> to document the behavior (s.th. like "WARNING: this method tries to
>> >> load
>> >> as many partitions as possible, make sure your database can handle the
>> >> load
>> >> or load them in chunks and use union"). SPARK-8008
>> >> https://issues.apache.org/jira/browse/SPARK-8008
>> >>
>> >> Regards,
>> >>   Rene Treffer
>> >>
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> For additional commands, e-mail: dev-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Scheduler question: stages with non-arithmetic numbering

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Are you seeing the same behavior on the driver UI? (that running on port
4040), If you click on the stage id header you can sort the stages based on
IDs.

Thanks
Best Regards

On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91...@gmail.com> wrote:

> Hi folks,
>
> When I look at the output logs for an iterative Spark program, I see
> that the stage IDs are not arithmetically numbered---that is, there
> are gaps between stages and I might find log information about Stage
> 0, 1,2, 5, but not 3 or 4.
>
> As an example, the output from the Spark logs below shows what I mean:
>
> # grep -rE "Stage [[:digit:]]+" spark_stderr  | grep finished
> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at blockMap.scala:1444)
> finished in 7.820 s:
> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) finished
> in 3.874 s:
> 18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179)
> finished in 2.237 s:
> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) finished
> in 1.749 s:
> 21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180)
> finished in 1.082 s:
> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) finished
> in 2.078 s:
> 24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188)
> finished in 1.317 s:
> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817) finished
> in 1.638 s:
> 27228:INFO:DAGScheduler:Stage 11 (count at blockMap.scala:1189)
> finished in 0.732 s:
> 27494:INFO:DAGScheduler:Stage 14 (foreach at blockMap.scala:1302)
> finished in 0.192 s:
> 27709:INFO:DAGScheduler:Stage 17 (foreach at blockMap.scala:1302)
> finished in 0.170 s:
> 28018:INFO:DAGScheduler:Stage 20 (count at blockMap.scala:1201)
> finished in 0.270 s:
> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355) finished
> in 0.455 s:
> 29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274)
> finished in 0.928 s:
> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355) finished
> in 0.305 s:
> 30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275)
> finished in 0.391 s:
> 30452:INFO:DAGScheduler:Stage 32 (first at
> MatrixFactorizationModel.scala:60) finished in 0.028 s:
> 30506:INFO:DAGScheduler:Stage 36 (first at
> MatrixFactorizationModel.scala:60) finished in 0.023 s:
>
> Can anyone comment on this being normal behavior? Is it indicative of
> faults causing stages to be resubmitted? I also cannot find the
> missing stages in any stage's parent List(Stage x, Stage y, ...)
>
> Thanks,
> Mike
>
>
> On 6/1/15, Reynold Xin <rx...@databricks.com> wrote:
> > Thanks, René. I actually added a warning to the new JDBC reader/writer
> > interface for 1.4.0.
> >
> > Even with that, I think we should support throttling JDBC; otherwise it's
> > too convenient for our users to DOS their production database servers!
> >
> >
> >   /**
> >    * Construct a [[DataFrame]] representing the database table accessible
> > via JDBC URL
> >    * url named table. Partitions of the table will be retrieved in
> parallel
> > based on the parameters
> >    * passed to this function.
> >    *
> > *   * Don't create too many partitions in parallel on a large cluster;
> > otherwise Spark might crash*
> > *   * your external database systems.*
> >    *
> >    * @param url JDBC database url of the form `jdbc:subprotocol:subname`
> >    * @param table Name of the table in the external database.
> >    * @param columnName the name of a column of integral type that will be
> > used for partitioning.
> >    * @param lowerBound the minimum value of `columnName` used to decide
> > partition stride
> >    * @param upperBound the maximum value of `columnName` used to decide
> > partition stride
> >    * @param numPartitions the number of partitions.  the range
> > `minValue`-`maxValue` will be split
> >    *                      evenly into this many partitions
> >    * @param connectionProperties JDBC database connection arguments, a
> list
> > of arbitrary string
> >    *                             tag/value. Normally at least a "user"
> and
> > "password" property
> >    *                             should be included.
> >    *
> >    * @since 1.4.0
> >    */
> >
> >
> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rt...@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" is an
> >> Array[String] of 32 to 48 elements).  (The code is tailored to your db,
> >> specifically through the where conditions, I'd have otherwise post it)
> >> That should be the DataFrame API, but I'm just trying to load everything
> >> and discard it as soon as possible :-)
> >>
> >> (1) Never do a silent drop of the values by default: it kills
> confidence.
> >> An option sounds reasonable.  Some sort of insight / log would be great.
> >> (How many columns of what type were truncated? why?)
> >> Note that I could declare the field as string via JdbcDialects (thank
> you
> >> guys for merging that :-) ).
> >> I have quite bad experiences with silent drops / truncates of columns
> and
> >> thus _like_ the strict way of spark. It causes trouble but noticing
> later
> >> that your data was corrupted during conversion is even worse.
> >>
> >> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004
> >>
> >> (3) One option would be to make it safe to use, the other option would
> be
> >> to document the behavior (s.th. like "WARNING: this method tries to
> load
> >> as many partitions as possible, make sure your database can handle the
> >> load
> >> or load them in chunks and use union"). SPARK-8008
> >> https://issues.apache.org/jira/browse/SPARK-8008
> >>
> >> Regards,
> >>   Rene Treffer
> >>
> >
>
>
> --
> Thanks,
> Mike
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>