You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/05 01:21:43 UTC
[2/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization
on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 31c07c7..7f7c7ed 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -25,7 +25,7 @@ import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
-import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text}
+import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.TextOutputFormat
@@ -277,12 +277,20 @@ abstract class RDD[T: ClassTag](
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}
+ /**
+ * Execute a block of code in a scope such that all new RDDs created in this body will
+ * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
+ *
+ * Note: Return statements are NOT allowed in the given body.
+ */
+ private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)
+
// Transformations (return a new RDD)
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
- def map[U: ClassTag](f: T => U): RDD[U] = {
+ def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
@@ -291,7 +299,7 @@ abstract class RDD[T: ClassTag](
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
- def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
+ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
@@ -299,7 +307,7 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
- def filter(f: T => Boolean): RDD[T] = {
+ def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
@@ -310,13 +318,16 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
- def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
+ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+ }
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
- def distinct(): RDD[T] = distinct(partitions.length)
+ def distinct(): RDD[T] = withScope {
+ distinct(partitions.length)
+ }
/**
* Return a new RDD that has exactly numPartitions partitions.
@@ -327,7 +338,7 @@ abstract class RDD[T: ClassTag](
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
- def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = {
+ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
@@ -352,7 +363,7 @@ abstract class RDD[T: ClassTag](
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
- : RDD[T] = {
+ : RDD[T] = withScope {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
@@ -377,16 +388,17 @@ abstract class RDD[T: ClassTag](
/**
* Return a sampled subset of this RDD.
- *
+ *
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
* @param seed seed for the random number generator
*/
- def sample(withReplacement: Boolean,
+ def sample(
+ withReplacement: Boolean,
fraction: Double,
- seed: Long = Utils.random.nextLong): RDD[T] = {
+ seed: Long = Utils.random.nextLong): RDD[T] = withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
@@ -403,7 +415,9 @@ abstract class RDD[T: ClassTag](
*
* @return split RDDs in an array
*/
- def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
+ def randomSplit(
+ weights: Array[Double],
+ seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope {
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
@@ -435,7 +449,9 @@ abstract class RDD[T: ClassTag](
* @param seed seed for the random number generator
* @return sample of specified size in an array
*/
- def takeSample(withReplacement: Boolean,
+ // TODO: rewrite this without return statements so we can wrap it in a scope
+ def takeSample(
+ withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] = {
val numStDev = 10.0
@@ -483,7 +499,7 @@ abstract class RDD[T: ClassTag](
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
- def union(other: RDD[T]): RDD[T] = {
+ def union(other: RDD[T]): RDD[T] = withScope {
if (partitioner.isDefined && other.partitioner == partitioner) {
new PartitionerAwareUnionRDD(sc, Array(this, other))
} else {
@@ -495,7 +511,9 @@ abstract class RDD[T: ClassTag](
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
- def ++(other: RDD[T]): RDD[T] = this.union(other)
+ def ++(other: RDD[T]): RDD[T] = withScope {
+ this.union(other)
+ }
/**
* Return this RDD sorted by the given key function.
@@ -504,10 +522,11 @@ abstract class RDD[T: ClassTag](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
- (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
+ (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
+ }
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
@@ -515,7 +534,7 @@ abstract class RDD[T: ClassTag](
*
* Note that this method performs a shuffle internally.
*/
- def intersection(other: RDD[T]): RDD[T] = {
+ def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
@@ -529,8 +548,9 @@ abstract class RDD[T: ClassTag](
*
* @param partitioner Partitioner to use for the resulting RDD
*/
- def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null)
- : RDD[T] = {
+ def intersection(
+ other: RDD[T],
+ partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
@@ -544,16 +564,14 @@ abstract class RDD[T: ClassTag](
*
* @param numPartitions How many partitions to use in the resulting RDD
*/
- def intersection(other: RDD[T], numPartitions: Int): RDD[T] = {
- this.map(v => (v, null)).cogroup(other.map(v => (v, null)), new HashPartitioner(numPartitions))
- .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
- .keys
+ def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
+ intersection(other, new HashPartitioner(numPartitions))
}
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
- def glom(): RDD[Array[T]] = {
+ def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
@@ -561,7 +579,9 @@ abstract class RDD[T: ClassTag](
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
* elements (a, b) where a is in `this` and b is in `other`.
*/
- def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
+ def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
+ new CartesianRDD(sc, this, other)
+ }
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
@@ -572,8 +592,9 @@ abstract class RDD[T: ClassTag](
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
- def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
+ def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
+ }
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
@@ -584,8 +605,11 @@ abstract class RDD[T: ClassTag](
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
- def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
+ def groupBy[K](
+ f: T => K,
+ numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy(f, new HashPartitioner(numPartitions))
+ }
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
@@ -597,7 +621,7 @@ abstract class RDD[T: ClassTag](
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
- : RDD[(K, Iterable[T])] = {
+ : RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
@@ -605,13 +629,16 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: String): RDD[String] = new PipedRDD(this, command)
+ def pipe(command: String): RDD[String] = withScope {
+ new PipedRDD(this, command)
+ }
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: String, env: Map[String, String]): RDD[String] =
+ def pipe(command: String, env: Map[String, String]): RDD[String] = withScope {
new PipedRDD(this, command, env)
+ }
/**
* Return an RDD created by piping elements to a forked external process.
@@ -619,7 +646,7 @@ abstract class RDD[T: ClassTag](
*
* @param command command to run in forked process.
* @param env environment variables to set.
- * @param printPipeContext Before piping elements, this function is called as an oppotunity
+ * @param printPipeContext Before piping elements, this function is called as an opportunity
* to pipe context data. Print line function (like out.println) will be
* passed as printPipeContext's parameter.
* @param printRDDElement Use this function to customize how to pipe elements. This function
@@ -637,7 +664,7 @@ abstract class RDD[T: ClassTag](
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
- separateWorkingDir: Boolean = false): RDD[String] = {
+ separateWorkingDir: Boolean = false): RDD[String] = withScope {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
@@ -651,7 +678,7 @@ abstract class RDD[T: ClassTag](
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
- f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
+ f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}
@@ -664,7 +691,8 @@ abstract class RDD[T: ClassTag](
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
- f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
+ f: (Int, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] = withScope {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}
@@ -681,7 +709,7 @@ abstract class RDD[T: ClassTag](
@deprecated("use TaskContext.get", "1.2.0")
def mapPartitionsWithContext[U: ClassTag](
f: (TaskContext, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] = {
+ preservesPartitioning: Boolean = false): RDD[U] = withScope {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}
@@ -692,7 +720,8 @@ abstract class RDD[T: ClassTag](
*/
@deprecated("use mapPartitionsWithIndex", "0.7.0")
def mapPartitionsWithSplit[U: ClassTag](
- f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
+ f: (Int, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] = withScope {
mapPartitionsWithIndex(f, preservesPartitioning)
}
@@ -704,7 +733,7 @@ abstract class RDD[T: ClassTag](
@deprecated("use mapPartitionsWithIndex", "1.0.0")
def mapWith[A, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
- (f: (T, A) => U): RDD[U] = {
+ (f: (T, A) => U): RDD[U] = withScope {
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.map(t => f(t, a))
@@ -719,7 +748,7 @@ abstract class RDD[T: ClassTag](
@deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0")
def flatMapWith[A, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
- (f: (T, A) => Seq[U]): RDD[U] = {
+ (f: (T, A) => Seq[U]): RDD[U] = withScope {
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.flatMap(t => f(t, a))
@@ -732,11 +761,11 @@ abstract class RDD[T: ClassTag](
* partition with the index of that partition.
*/
@deprecated("use mapPartitionsWithIndex and foreach", "1.0.0")
- def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit) {
+ def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope {
mapPartitionsWithIndex { (index, iter) =>
val a = constructA(index)
iter.map(t => {f(t, a); t})
- }.foreach(_ => {})
+ }
}
/**
@@ -745,7 +774,7 @@ abstract class RDD[T: ClassTag](
* partition with the index of that partition.
*/
@deprecated("use mapPartitionsWithIndex and filter", "1.0.0")
- def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
+ def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope {
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.filter(t => p(t, a))
@@ -758,7 +787,7 @@ abstract class RDD[T: ClassTag](
* partitions* and the *same number of elements in each partition* (e.g. one was made through
* a map on the other).
*/
- def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
+ def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
new Iterator[(T, U)] {
def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
@@ -780,33 +809,39 @@ abstract class RDD[T: ClassTag](
*/
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B], preservesPartitioning: Boolean)
- (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
+ (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
+ }
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B])
- (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
+ (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
+ zipPartitions(rdd2, preservesPartitioning = false)(f)
+ }
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
- (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
+ (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
+ }
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C])
- (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
+ (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope {
+ zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f)
+ }
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
- (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
+ (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
+ }
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
- (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
+ (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope {
+ zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
+ }
// Actions (launch a job to return a value to the user program)
@@ -814,7 +849,7 @@ abstract class RDD[T: ClassTag](
/**
* Applies a function f to all elements of this RDD.
*/
- def foreach(f: T => Unit) {
+ def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
@@ -822,7 +857,7 @@ abstract class RDD[T: ClassTag](
/**
* Applies a function f to each partition of this RDD.
*/
- def foreachPartition(f: Iterator[T] => Unit) {
+ def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
@@ -830,7 +865,7 @@ abstract class RDD[T: ClassTag](
/**
* Return an array that contains all of the elements in this RDD.
*/
- def collect(): Array[T] = {
+ def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
@@ -840,7 +875,7 @@ abstract class RDD[T: ClassTag](
*
* The iterator will consume as much memory as the largest partition in this RDD.
*/
- def toLocalIterator: Iterator[T] = {
+ def toLocalIterator: Iterator[T] = withScope {
def collectPartition(p: Int): Array[T] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
}
@@ -851,12 +886,14 @@ abstract class RDD[T: ClassTag](
* Return an array that contains all of the elements in this RDD.
*/
@deprecated("use collect", "1.0.0")
- def toArray(): Array[T] = collect()
+ def toArray(): Array[T] = withScope {
+ collect()
+ }
/**
* Return an RDD that contains all matching values by applying `f`.
*/
- def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = {
+ def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
filter(f.isDefinedAt).map(f)
}
@@ -866,19 +903,23 @@ abstract class RDD[T: ClassTag](
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
- def subtract(other: RDD[T]): RDD[T] =
+ def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
+ }
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
- def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
+ def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
subtract(other, new HashPartitioner(numPartitions))
+ }
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
- def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {
+ def subtract(
+ other: RDD[T],
+ p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
if (partitioner == Some(p)) {
// Our partitioner knows how to handle T (which, since we have a partitioner, is
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
@@ -900,7 +941,7 @@ abstract class RDD[T: ClassTag](
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
- def reduce(f: (T, T) => T): T = {
+ def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
@@ -929,7 +970,7 @@ abstract class RDD[T: ClassTag](
* @param depth suggested depth of the tree (default: 2)
* @see [[org.apache.spark.rdd.RDD#reduce]]
*/
- def treeReduce(f: (T, T) => T, depth: Int = 2): T = {
+ def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
val cleanF = context.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
@@ -961,7 +1002,7 @@ abstract class RDD[T: ClassTag](
* modify t1 and return it as its result value to avoid object allocation; however, it should not
* modify t2.
*/
- def fold(zeroValue: T)(op: (T, T) => T): T = {
+ def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
@@ -979,7 +1020,7 @@ abstract class RDD[T: ClassTag](
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*/
- def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
+ def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
@@ -999,26 +1040,29 @@ abstract class RDD[T: ClassTag](
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
- depth: Int = 2): U = {
+ depth: Int = 2): U = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.length == 0) {
- return Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
- }
- val cleanSeqOp = context.clean(seqOp)
- val cleanCombOp = context.clean(combOp)
- val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
- var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
- var numPartitions = partiallyAggregated.partitions.length
- val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
- // If creating an extra level doesn't help reduce the wall-clock time, we stop tree aggregation.
- while (numPartitions > scale + numPartitions / scale) {
- numPartitions /= scale
- val curNumPartitions = numPartitions
- partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) =>
- iter.map((i % curNumPartitions, _))
- }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
+ Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
+ } else {
+ val cleanSeqOp = context.clean(seqOp)
+ val cleanCombOp = context.clean(combOp)
+ val aggregatePartition =
+ (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
+ var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
+ var numPartitions = partiallyAggregated.partitions.length
+ val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
+ // If creating an extra level doesn't help reduce
+ // the wall-clock time, we stop tree aggregation.
+ while (numPartitions > scale + numPartitions / scale) {
+ numPartitions /= scale
+ val curNumPartitions = numPartitions
+ partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
+ (i, iter) => iter.map((i % curNumPartitions, _))
+ }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
+ }
+ partiallyAggregated.reduce(cleanCombOp)
}
- partiallyAggregated.reduce(cleanCombOp)
}
/**
@@ -1032,7 +1076,9 @@ abstract class RDD[T: ClassTag](
* within a timeout, even if not all tasks have finished.
*/
@Experimental
- def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+ def countApprox(
+ timeout: Long,
+ confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
var result = 0L
while (iter.hasNext) {
@@ -1053,7 +1099,7 @@ abstract class RDD[T: ClassTag](
* To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which
* returns an RDD[T, Long] instead of a map.
*/
- def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = {
+ def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
map(value => (value, null)).countByKey()
}
@@ -1064,8 +1110,7 @@ abstract class RDD[T: ClassTag](
@Experimental
def countByValueApprox(timeout: Long, confidence: Double = 0.95)
(implicit ord: Ordering[T] = null)
- : PartialResult[Map[T, BoundedDouble]] =
- {
+ : PartialResult[Map[T, BoundedDouble]] = withScope {
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
@@ -1098,7 +1143,7 @@ abstract class RDD[T: ClassTag](
* If `sp` equals 0, the sparse representation is skipped.
*/
@Experimental
- def countApproxDistinct(p: Int, sp: Int): Long = {
+ def countApproxDistinct(p: Int, sp: Int): Long = withScope {
require(p >= 4, s"p ($p) must be at least 4")
require(sp <= 32, s"sp ($sp) cannot be greater than 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
@@ -1124,7 +1169,7 @@ abstract class RDD[T: ClassTag](
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
- def countApproxDistinct(relativeSD: Double = 0.05): Long = {
+ def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope {
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
countApproxDistinct(p, 0)
}
@@ -1142,7 +1187,9 @@ abstract class RDD[T: ClassTag](
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
- def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)
+ def zipWithIndex(): RDD[(T, Long)] = withScope {
+ new ZippedWithIndexRDD(this)
+ }
/**
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
@@ -1154,7 +1201,7 @@ abstract class RDD[T: ClassTag](
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
- def zipWithUniqueId(): RDD[(T, Long)] = {
+ def zipWithUniqueId(): RDD[(T, Long)] = withScope {
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
iter.zipWithIndex.map { case (item, i) =>
@@ -1171,48 +1218,50 @@ abstract class RDD[T: ClassTag](
* @note due to complications in the internal implementation, this method will raise
* an exception if called on an RDD of `Nothing` or `Null`.
*/
- def take(num: Int): Array[T] = {
+ def take(num: Int): Array[T] = withScope {
if (num == 0) {
- return new Array[T](0)
- }
-
- val buf = new ArrayBuffer[T]
- val totalParts = this.partitions.length
- var partsScanned = 0
- while (buf.size < num && partsScanned < totalParts) {
- // The number of partitions to try in this iteration. It is ok for this number to be
- // greater than totalParts because we actually cap it at totalParts in runJob.
- var numPartsToTry = 1
- if (partsScanned > 0) {
- // If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
- // interpolate the number of partitions we need to try, but overestimate it by 50%.
- // We also cap the estimation in the end.
- if (buf.size == 0) {
- numPartsToTry = partsScanned * 4
- } else {
- // the left side of max is >=1 whenever partsScanned >= 2
- numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
- numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
+ new Array[T](0)
+ } else {
+ val buf = new ArrayBuffer[T]
+ val totalParts = this.partitions.length
+ var partsScanned = 0
+ while (buf.size < num && partsScanned < totalParts) {
+ // The number of partitions to try in this iteration. It is ok for this number to be
+ // greater than totalParts because we actually cap it at totalParts in runJob.
+ var numPartsToTry = 1
+ if (partsScanned > 0) {
+ // If we didn't find any rows after the previous iteration, quadruple and retry.
+ // Otherwise, interpolate the number of partitions we need to try, but overestimate
+ // it by 50%. We also cap the estimation in the end.
+ if (buf.size == 0) {
+ numPartsToTry = partsScanned * 4
+ } else {
+ // the left side of max is >=1 whenever partsScanned >= 2
+ numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
+ numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
+ }
}
- }
- val left = num - buf.size
- val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
- val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
+ val left = num - buf.size
+ val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
+ val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
- res.foreach(buf ++= _.take(num - buf.size))
- partsScanned += numPartsToTry
- }
+ res.foreach(buf ++= _.take(num - buf.size))
+ partsScanned += numPartsToTry
+ }
- buf.toArray
+ buf.toArray
+ }
}
/**
* Return the first element in this RDD.
*/
- def first(): T = take(1) match {
- case Array(t) => t
- case _ => throw new UnsupportedOperationException("empty collection")
+ def first(): T = withScope {
+ take(1) match {
+ case Array(t) => t
+ case _ => throw new UnsupportedOperationException("empty collection")
+ }
}
/**
@@ -1230,7 +1279,9 @@ abstract class RDD[T: ClassTag](
* @param ord the implicit ordering for T
* @return an array of top elements
*/
- def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)
+ def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
+ takeOrdered(num)(ord.reverse)
+ }
/**
* Returns the first k (smallest) elements from this RDD as defined by the specified
@@ -1248,7 +1299,7 @@ abstract class RDD[T: ClassTag](
* @param ord the implicit ordering for T
* @return an array of top elements
*/
- def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0) {
Array.empty
} else {
@@ -1273,13 +1324,17 @@ abstract class RDD[T: ClassTag](
* Returns the max of this RDD as defined by the implicit Ordering[T].
* @return the maximum element of the RDD
* */
- def max()(implicit ord: Ordering[T]): T = this.reduce(ord.max)
+ def max()(implicit ord: Ordering[T]): T = withScope {
+ this.reduce(ord.max)
+ }
/**
* Returns the min of this RDD as defined by the implicit Ordering[T].
* @return the minimum element of the RDD
* */
- def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
+ def min()(implicit ord: Ordering[T]): T = withScope {
+ this.reduce(ord.min)
+ }
/**
* @note due to complications in the internal implementation, this method will raise an
@@ -1289,12 +1344,14 @@ abstract class RDD[T: ClassTag](
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
- def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0
+ def isEmpty(): Boolean = withScope {
+ partitions.length == 0 || take(1).length == 0
+ }
/**
* Save this RDD as a text file, using string representations of elements.
*/
- def saveAsTextFile(path: String) {
+ def saveAsTextFile(path: String): Unit = withScope {
// https://issues.apache.org/jira/browse/SPARK-2075
//
// NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
@@ -1321,7 +1378,7 @@ abstract class RDD[T: ClassTag](
/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
- def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
+ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
// https://issues.apache.org/jira/browse/SPARK-2075
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
@@ -1339,7 +1396,7 @@ abstract class RDD[T: ClassTag](
/**
* Save this RDD as a SequenceFile of serialized objects.
*/
- def saveAsObjectFile(path: String) {
+ def saveAsObjectFile(path: String): Unit = withScope {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
@@ -1348,12 +1405,12 @@ abstract class RDD[T: ClassTag](
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
- def keyBy[K](f: T => K): RDD[(K, T)] = {
+ def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
map(x => (f(x), x))
}
/** A private method for tests, to look at the contents of each partition */
- private[spark] def collectPartitions(): Array[Array[T]] = {
+ private[spark] def collectPartitions(): Array[Array[T]] = withScope {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
@@ -1392,6 +1449,17 @@ abstract class RDD[T: ClassTag](
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSite = sc.getCallSite()
+ /**
+ * The scope associated with the operation that created this RDD.
+ *
+ * This is more flexible than the call site and can be defined hierarchically. For more
+ * detail, see the documentation of {{RDDOperationScope}}. This scope is not defined if the
+ * user instantiates this RDD himself without using any Spark operations.
+ */
+ @transient private[spark] val scope: Option[RDDOperationScope] = {
+ Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
+ }
+
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
@@ -1470,7 +1538,7 @@ abstract class RDD[T: ClassTag](
/** A description of this RDD and its recursive dependencies for debugging. */
def toDebugString: String = {
// Get a debug description of an rdd without its children
- def debugSelf (rdd: RDD[_]): Seq[String] = {
+ def debugSelf(rdd: RDD[_]): Seq[String] = {
import Utils.bytesToString
val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
@@ -1527,10 +1595,11 @@ abstract class RDD[T: ClassTag](
case (desc: String, _) => s"$nextPrefix$desc"
} ++ debugChildren(rdd, nextPrefix)
}
- def debugString(rdd: RDD[_],
- prefix: String = "",
- isShuffle: Boolean = true,
- isLastChild: Boolean = false): Seq[String] = {
+ def debugString(
+ rdd: RDD[_],
+ prefix: String = "",
+ isShuffle: Boolean = true,
+ isLastChild: Boolean = false): Seq[String] = {
if (isShuffle) {
shuffleDebugString(rdd, prefix, isLastChild)
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
new file mode 100644
index 0000000..537b56b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude, JsonPropertyOrder}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.spark.SparkContext
+
+/**
+ * A general, named code block representing an operation that instantiates RDDs.
+ *
+ * All RDDs instantiated in the corresponding code block will store a pointer to this object.
+ * Examples include, but will not be limited to, existing RDD operations, such as textFile,
+ * reduceByKey, and treeAggregate.
+ *
+ * An operation scope may be nested in other scopes. For instance, a SQL query may enclose
+ * scopes associated with the public RDD APIs it uses under the hood.
+ *
+ * There is no particular relationship between an operation scope and a stage or a job.
+ * A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take).
+ */
+@JsonInclude(Include.NON_NULL)
+@JsonPropertyOrder(Array("id", "name", "parent"))
+private[spark] class RDDOperationScope(
+ val name: String,
+ val parent: Option[RDDOperationScope] = None) {
+
+ val id: Int = RDDOperationScope.nextScopeId()
+
+ def toJson: String = {
+ RDDOperationScope.jsonMapper.writeValueAsString(this)
+ }
+
+ /**
+ * Return a list of scopes that this scope is a part of, including this scope itself.
+ * The result is ordered from the outermost scope (eldest ancestor) to this scope.
+ */
+ @JsonIgnore
+ def getAllScopes: Seq[RDDOperationScope] = {
+ parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this)
+ }
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case s: RDDOperationScope =>
+ id == s.id && name == s.name && parent == s.parent
+ case _ => false
+ }
+ }
+
+ override def toString: String = toJson
+}
+
+/**
+ * A collection of utility methods to construct a hierarchical representation of RDD scopes.
+ * An RDD scope tracks the series of operations that created a given RDD.
+ */
+private[spark] object RDDOperationScope {
+ private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
+ private val scopeCounter = new AtomicInteger(0)
+
+ def fromJson(s: String): RDDOperationScope = {
+ jsonMapper.readValue(s, classOf[RDDOperationScope])
+ }
+
+ /** Return a globally unique operation scope ID. */
+ def nextScopeId(): Int = scopeCounter.getAndIncrement
+
+ /**
+ * Execute the given body such that all RDDs created in this body will have the same scope.
+ * The name of the scope will be the name of the method that immediately encloses this one.
+ *
+ * Note: Return statements are NOT allowed in body.
+ */
+ private[spark] def withScope[T](
+ sc: SparkContext,
+ allowNesting: Boolean = false)(body: => T): T = {
+ val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName
+ withScope[T](sc, callerMethodName, allowNesting)(body)
+ }
+
+ /**
+ * Execute the given body such that all RDDs created in this body will have the same scope.
+ *
+ * If nesting is allowed, this concatenates the previous scope with the new one in a way that
+ * signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to
+ * this method executed in the body will have no effect.
+ *
+ * Note: Return statements are NOT allowed in body.
+ */
+ private[spark] def withScope[T](
+ sc: SparkContext,
+ name: String,
+ allowNesting: Boolean = false)(body: => T): T = {
+ // Save the old scope to restore it later
+ val scopeKey = SparkContext.RDD_SCOPE_KEY
+ val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
+ val oldScopeJson = sc.getLocalProperty(scopeKey)
+ val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
+ val oldNoOverride = sc.getLocalProperty(noOverrideKey)
+ try {
+ // Set the scope only if the higher level caller allows us to do so
+ if (sc.getLocalProperty(noOverrideKey) == null) {
+ sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
+ }
+ // Optionally disallow the child body to override our scope
+ if (!allowNesting) {
+ sc.setLocalProperty(noOverrideKey, "true")
+ }
+ body
+ } finally {
+ // Remember to restore any state that was modified before exiting
+ sc.setLocalProperty(scopeKey, oldScopeJson)
+ sc.setLocalProperty(noOverrideKey, oldNoOverride)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 059f896..3dfcf67 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -85,7 +85,9 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
* byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
* file system.
*/
- def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
+ def saveAsSequenceFile(
+ path: String,
+ codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
def anyToWritable[U <% Writable](u: U): Writable = u
// TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index cf3db0b..e439d2a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -33,6 +33,7 @@ class StageInfo(
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
+ val parentIds: Seq[Int],
val details: String) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
@@ -78,6 +79,7 @@ private[spark] object StageInfo {
stage.name,
numTasks.getOrElse(stage.numTasks),
rddInfos,
+ stage.parents.map(_.id),
stage.details)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index ad53a3e..9606262 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -18,7 +18,7 @@
package org.apache.spark.storage
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDDOperationScope, RDD}
import org.apache.spark.util.Utils
@DeveloperApi
@@ -26,7 +26,9 @@ class RDDInfo(
val id: Int,
val name: String,
val numPartitions: Int,
- var storageLevel: StorageLevel)
+ var storageLevel: StorageLevel,
+ val parentIds: Seq[Int],
+ val scope: Option[RDDOperationScope] = None)
extends Ordered[RDDInfo] {
var numCachedPartitions = 0
@@ -52,7 +54,8 @@ class RDDInfo(
private[spark] object RDDInfo {
def fromRdd(rdd: RDD[_]): RDDInfo = {
- val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
- new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel)
+ val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
+ val parentIds = rdd.dependencies.map(_.rdd.id)
+ new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 06fce86..a5271f0 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -25,6 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab}
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
+import org.apache.spark.ui.scope.RDDOperationGraphListener
/**
* Top level user interface for a Spark application.
@@ -38,6 +39,7 @@ private[spark] class SparkUI private (
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
+ val operationGraphListener: RDDOperationGraphListener,
var appName: String,
val basePath: String)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
@@ -93,6 +95,9 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
private[spark] object SparkUI {
val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
+ val DEFAULT_POOL_NAME = "default"
+ val DEFAULT_RETAINED_STAGES = 1000
+ val DEFAULT_RETAINED_JOBS = 1000
def getUIPort(conf: SparkConf): Int = {
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
@@ -144,13 +149,16 @@ private[spark] object SparkUI {
val storageStatusListener = new StorageStatusListener
val executorsListener = new ExecutorsListener(storageStatusListener)
val storageListener = new StorageListener(storageStatusListener)
+ val operationGraphListener = new RDDOperationGraphListener(conf)
listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
+ listenerBus.addListener(operationGraphListener)
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
- executorsListener, _jobProgressListener, storageListener, appName, basePath)
+ executorsListener, _jobProgressListener, storageListener, operationGraphListener,
+ appName, basePath)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 395af2e..2f3fb18 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -23,6 +23,7 @@ import java.util.{Locale, Date}
import scala.xml.{Node, Text}
import org.apache.spark.Logging
+import org.apache.spark.ui.scope.RDDOperationGraph
/** Utility functions for generating XML pages with spark content. */
private[spark] object UIUtils extends Logging {
@@ -172,13 +173,21 @@ private[spark] object UIUtils extends Logging {
<script src={prependBaseUri("/static/timeline-view.js")}></script>
}
+ def vizHeaderNodes: Seq[Node] = {
+ <script src={prependBaseUri("/static/d3.min.js")}></script>
+ <script src={prependBaseUri("/static/dagre-d3.min.js")}></script>
+ <script src={prependBaseUri("/static/graphlib-dot.min.js")}></script>
+ <script src={prependBaseUri("/static/spark-dag-viz.js")}></script>
+ }
+
/** Returns a spark page with correctly formatted headers */
def headerSparkPage(
title: String,
content: => Seq[Node],
activeTab: SparkUITab,
refreshInterval: Option[Int] = None,
- helpText: Option[String] = None): Seq[Node] = {
+ helpText: Option[String] = None,
+ showVisualization: Boolean = false): Seq[Node] = {
val appName = activeTab.appName
val shortAppName = if (appName.length < 36) appName else appName.take(32) + "..."
@@ -196,6 +205,7 @@ private[spark] object UIUtils extends Logging {
<html>
<head>
{commonHeaderNodes}
+ {if (showVisualization) vizHeaderNodes else Seq.empty}
<title>{appName} - {title}</title>
</head>
<body>
@@ -320,4 +330,47 @@ private[spark] object UIUtils extends Logging {
<div class="bar bar-running" style={startWidth}></div>
</div>
}
+
+ /** Return a "DAG visualization" DOM element that expands into a visualization for a stage. */
+ def showDagVizForStage(stageId: Int, graph: Option[RDDOperationGraph]): Seq[Node] = {
+ showDagViz(graph.toSeq, forJob = false)
+ }
+
+ /** Return a "DAG visualization" DOM element that expands into a visualization for a job. */
+ def showDagVizForJob(jobId: Int, graphs: Seq[RDDOperationGraph]): Seq[Node] = {
+ showDagViz(graphs, forJob = true)
+ }
+
+ /**
+ * Return a "DAG visualization" DOM element that expands into a visualization on the UI.
+ *
+ * This populates metadata necessary for generating the visualization on the front-end in
+ * a format that is expected by spark-dag-viz.js. Any changes in the format here must be
+ * reflected there.
+ */
+ private def showDagViz(graphs: Seq[RDDOperationGraph], forJob: Boolean): Seq[Node] = {
+ <div>
+ <span class="expand-dag-viz" onclick={s"toggleDagViz($forJob);"}>
+ <span class="expand-dag-viz-arrow arrow-closed"></span>
+ <strong>DAG visualization</strong>
+ </span>
+ <div id="dag-viz-graph"></div>
+ <div id="dag-viz-metadata">
+ {
+ graphs.map { g =>
+ <div class="stage-metadata" stageId={g.rootCluster.id} style="display:none">
+ <div class="dot-file">{RDDOperationGraph.makeDotFile(g, forJob)}</div>
+ { g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
+ { g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
+ {
+ g.rootCluster.getAllNodes.filter(_.cached).map { n =>
+ <div class="cached-rdd">{n.id}</div>
+ }
+ }
+ </div>
+ }
+ }
+ </div>
+ </div>
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index a7ea12b..f6abf27 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -179,7 +179,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
<span class="expand-application-timeline">
<span class="expand-application-timeline-arrow arrow-closed"></span>
- <strong>Event Timeline</strong>
+ <strong>Event timeline</strong>
</span> ++
<div id="application-timeline" class="collapsed">
<div class="control-panel">
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index 527f960..236bc8e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing list of all ongoing and recently finished stages and pools */
private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
private val sc = parent.sc
- private val listener = parent.listener
+ private val listener = parent.progressListener
private def isFairScheduler = parent.isFairScheduler
def render(request: HttpServletRequest): Seq[Node] = {
@@ -42,18 +42,18 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
val activeStagesTable =
new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
+ parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
killEnabled = parent.killEnabled)
val pendingStagesTable =
new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
+ parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
killEnabled = false)
val completedStagesTable =
new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
+ parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
val failedStagesTable =
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.listener, isFairScheduler = parent.isFairScheduler)
+ parent.progressListener, isFairScheduler = parent.isFairScheduler)
// For now, pool information is only accessible in live UIs
val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 1f8536d..d5cdbfa 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
/** Stage summary grouped by executors. */
private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: StagesTab) {
- private val listener = parent.listener
+ private val listener = parent.progressListener
def toNodeSeq: Seq[Node] = {
listener.synchronized {
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index dd968e1..96cc3d7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -31,6 +31,7 @@ import org.apache.spark.ui.jobs.UIData.ExecutorUIData
/** Page showing statistics and stage list for a given job */
private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
+
private val STAGES_LEGEND =
<div class="legend-area"><svg width="150px" height="85px">
<rect class="completed-stage-legend"
@@ -133,7 +134,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
events.toSeq
}
- private def makeTimeline(
+ private def makeTimeline(
stages: Seq[StageInfo],
executors: HashMap[String, ExecutorUIData],
appStartTime: Long): Seq[Node] = {
@@ -160,7 +161,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
<span class="expand-job-timeline">
<span class="expand-job-timeline-arrow arrow-closed"></span>
- <strong>Event Timeline</strong>
+ <strong>Event timeline</strong>
</span> ++
<div id="job-timeline" class="collapsed">
<div class="control-panel">
@@ -198,7 +199,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
// This could be empty if the JobProgressListener hasn't received information about the
// stage or if the stage information has been garbage collected
listener.stageIdToInfo.getOrElse(stageId,
- new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, "Unknown"))
+ new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
}
val activeStages = Buffer[StageInfo]()
@@ -303,9 +304,14 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
var content = summary
val appStartTime = listener.startTime
val executorListener = parent.executorListener
+ val operationGraphListener = parent.operationGraphListener
+
content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
executorListener.executorIdToData, appStartTime)
+ content ++= UIUtils.showDagVizForJob(
+ jobId, operationGraphListener.getOperationGraphForJob(jobId))
+
if (shouldShowActiveStages) {
content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
activeStagesTable.toNodeSeq
@@ -326,7 +332,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
failedStagesTable.toNodeSeq
}
- UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent)
+ UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index d6d716d..8f9aa9f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData._
/**
@@ -38,8 +39,6 @@ import org.apache.spark.ui.jobs.UIData._
@DeveloperApi
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
- import JobProgressListener._
-
// Define a handful of type aliases so that data structures' types can serve as documentation.
// These type aliases are public because they're used in the types of public fields:
@@ -86,8 +85,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// To limit the total memory usage of JobProgressListener, we only track information for a fixed
// number of non-active jobs and stages (there is no limit for active jobs and stages):
- val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
- val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
+ val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
+ val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
// We can test for memory leaks by ensuring that collections that track non-active jobs and
// stages do not grow without bound and that collections for active jobs/stages eventually become
@@ -288,8 +287,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
activeStages(stage.stageId) = stage
pendingStages.remove(stage.stageId)
val poolName = Option(stageSubmitted.properties).map {
- p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
- }.getOrElse(DEFAULT_POOL_NAME)
+ p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME)
+ }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
@@ -524,9 +523,3 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
startTime = appStarted.time
}
}
-
-private object JobProgressListener {
- val DEFAULT_POOL_NAME = "default"
- val DEFAULT_RETAINED_STAGES = 1000
- val DEFAULT_RETAINED_JOBS = 1000
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 342787f..77ca60b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -24,10 +24,12 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
val sc = parent.sc
val killEnabled = parent.killEnabled
- def isFairScheduler: Boolean =
- jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR)
val jobProgresslistener = parent.jobProgressListener
val executorListener = parent.executorsListener
+ val operationGraphListener = parent.operationGraphListener
+
+ def isFairScheduler: Boolean =
+ jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR)
attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index f47cdc9..d725b9d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing specific pool details */
private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
private val sc = parent.sc
- private val listener = parent.listener
+ private val listener = parent.progressListener
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
@@ -40,7 +40,7 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
case None => Seq[StageInfo]()
}
val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
+ parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
killEnabled = parent.killEnabled)
// For now, pool information is only accessible in live UIs
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index df1899e..9ba2af5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.ui.UIUtils
/** Table showing list of pools */
private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
- private val listener = parent.listener
+ private val listener = parent.progressListener
def toNodeSeq: Seq[Node] = {
listener.synchronized {
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 797c940..5793100 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -27,15 +27,17 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData._
+import org.apache.spark.ui.scope.RDDOperationGraph
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
- private val listener = parent.listener
+ private val progressListener = parent.progressListener
+ private val operationGraphListener = parent.operationGraphListener
def render(request: HttpServletRequest): Seq[Node] = {
- listener.synchronized {
+ progressListener.synchronized {
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
@@ -44,7 +46,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val stageId = parameterId.toInt
val stageAttemptId = parameterAttempt.toInt
- val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId))
+ val stageDataOption = progressListener.stageIdToData.get((stageId, stageAttemptId))
if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
val content =
@@ -60,7 +62,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
- val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables
+ val accumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables
val hasAccumulators = accumulables.size > 0
val summary =
@@ -169,6 +171,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</div>
</div>
+ val dagViz = UIUtils.showDagVizForStage(
+ stageId, operationGraphListener.getOperationGraphForStage(stageId))
+
val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
def accumulableRow(acc: AccumulableInfo): Elem =
<tr><td>{acc.name}</td><td>{acc.value}</td></tr>
@@ -434,13 +439,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val content =
summary ++
showAdditionalMetrics ++
+ dagViz ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
maybeAccumulableTable ++
<h4>Tasks</h4> ++ taskTable
- UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent)
+ UIUtils.headerSparkPage(
+ "Details for Stage %d".format(stageId), content, parent, showVisualization = true)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 1bd2d87..5516995 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -26,19 +26,20 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
val sc = parent.sc
val killEnabled = parent.killEnabled
- val listener = parent.jobProgressListener
+ val progressListener = parent.jobProgressListener
+ val operationGraphListener = parent.operationGraphListener
attachPage(new AllStagesPage(this))
attachPage(new StagePage(this))
attachPage(new PoolPage(this))
- def isFairScheduler: Boolean = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
+ def isFairScheduler: Boolean = progressListener.schedulingMode.exists(_ == SchedulingMode.FAIR)
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
- if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
+ if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) {
sc.get.cancelStage(stageId)
}
// Do a quick pause here to give Spark time to kill the stage so it shows up as
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
new file mode 100644
index 0000000..a18c193
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.scope
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.Logging
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A representation of a generic cluster graph used for storing information on RDD operations.
+ *
+ * Each graph is defined with a set of edges and a root cluster, which may contain children
+ * nodes and children clusters. Additionally, a graph may also have edges that enter or exit
+ * the graph from nodes that belong to adjacent graphs.
+ */
+private[ui] case class RDDOperationGraph(
+ edges: Seq[RDDOperationEdge],
+ outgoingEdges: Seq[RDDOperationEdge],
+ incomingEdges: Seq[RDDOperationEdge],
+ rootCluster: RDDOperationCluster)
+
+/** A node in an RDDOperationGraph. This represents an RDD. */
+private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean)
+
+/**
+ * A directed edge connecting two nodes in an RDDOperationGraph.
+ * This represents an RDD dependency.
+ */
+private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)
+
+/**
+ * A cluster that groups nodes together in an RDDOperationGraph.
+ *
+ * This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap),
+ * stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
+ */
+private[ui] class RDDOperationCluster(val id: String, val name: String) {
+ private val _childNodes = new ListBuffer[RDDOperationNode]
+ private val _childClusters = new ListBuffer[RDDOperationCluster]
+
+ def childNodes: Seq[RDDOperationNode] = _childNodes.iterator.toSeq
+ def childClusters: Seq[RDDOperationCluster] = _childClusters.iterator.toSeq
+ def attachChildNode(childNode: RDDOperationNode): Unit = { _childNodes += childNode }
+ def attachChildCluster(childCluster: RDDOperationCluster): Unit = {
+ _childClusters += childCluster
+ }
+
+ /** Return all the nodes container in this cluster, including ones nested in other clusters. */
+ def getAllNodes: Seq[RDDOperationNode] = {
+ _childNodes ++ _childClusters.flatMap(_.childNodes)
+ }
+}
+
+private[ui] object RDDOperationGraph extends Logging {
+
+ /**
+ * Construct a RDDOperationGraph for a given stage.
+ *
+ * The root cluster represents the stage, and all children clusters represent RDD operations.
+ * Each node represents an RDD, and each edge represents a dependency between two RDDs pointing
+ * from the parent to the child.
+ *
+ * This does not currently merge common operation scopes across stages. This may be worth
+ * supporting in the future if we decide to group certain stages within the same job under
+ * a common scope (e.g. part of a SQL query).
+ */
+ def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
+ val edges = new ListBuffer[RDDOperationEdge]
+ val nodes = new mutable.HashMap[Int, RDDOperationNode]
+ val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
+
+ // Root cluster is the stage cluster
+ val stageClusterId = s"stage_${stage.stageId}"
+ val stageClusterName = s"Stage ${stage.stageId}" +
+ { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
+ val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
+
+ // Find nodes, edges, and operation scopes that belong to this stage
+ stage.rddInfos.foreach { rdd =>
+ edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
+
+ // TODO: differentiate between the intention to cache an RDD and whether it's actually cached
+ val node = nodes.getOrElseUpdate(
+ rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE))
+
+ if (rdd.scope == null) {
+ // This RDD has no encompassing scope, so we put it directly in the root cluster
+ // This should happen only if an RDD is instantiated outside of a public RDD API
+ rootCluster.attachChildNode(node)
+ } else {
+ // Otherwise, this RDD belongs to an inner cluster,
+ // which may be nested inside of other clusters
+ val rddScopes = rdd.scope.map { scope => scope.getAllScopes }.getOrElse(Seq.empty)
+ val rddClusters = rddScopes.map { scope =>
+ val clusterId = scope.name + "_" + scope.id
+ val clusterName = scope.name
+ clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName))
+ }
+ // Build the cluster hierarchy for this RDD
+ rddClusters.sliding(2).foreach { pc =>
+ if (pc.size == 2) {
+ val parentCluster = pc(0)
+ val childCluster = pc(1)
+ parentCluster.attachChildCluster(childCluster)
+ }
+ }
+ // Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster
+ rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) }
+ rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
+ }
+ }
+
+ // Classify each edge as internal, outgoing or incoming
+ // This information is needed to reason about how stages relate to each other
+ val internalEdges = new ListBuffer[RDDOperationEdge]
+ val outgoingEdges = new ListBuffer[RDDOperationEdge]
+ val incomingEdges = new ListBuffer[RDDOperationEdge]
+ edges.foreach { case e: RDDOperationEdge =>
+ val fromThisGraph = nodes.contains(e.fromId)
+ val toThisGraph = nodes.contains(e.toId)
+ (fromThisGraph, toThisGraph) match {
+ case (true, true) => internalEdges += e
+ case (true, false) => outgoingEdges += e
+ case (false, true) => incomingEdges += e
+ // should never happen
+ case _ => logWarning(s"Found an orphan edge in stage ${stage.stageId}: $e")
+ }
+ }
+
+ RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster)
+ }
+
+ /**
+ * Generate the content of a dot file that describes the specified graph.
+ *
+ * Note that this only uses a minimal subset of features available to the DOT specification.
+ * Part of the styling must be done here because the rendering library must take certain
+ * attributes into account when arranging the graph elements. More style is added in the
+ * visualization later through post-processing in JavaScript.
+ *
+ * For the complete DOT specification, see http://www.graphviz.org/Documentation/dotguide.pdf.
+ */
+ def makeDotFile(graph: RDDOperationGraph, forJob: Boolean): String = {
+ val dotFile = new StringBuilder
+ dotFile.append("digraph G {\n")
+ dotFile.append(makeDotSubgraph(graph.rootCluster, forJob, indent = " "))
+ graph.edges.foreach { edge =>
+ dotFile.append(s""" ${edge.fromId}->${edge.toId} [lineInterpolate="basis"];\n""")
+ }
+ dotFile.append("}")
+ val result = dotFile.toString()
+ logDebug(result)
+ result
+ }
+
+ /**
+ * Return the dot representation of a node in an RDDOperationGraph.
+ *
+ * On the job page, is displayed as a small circle without labels.
+ * On the stage page, it is displayed as a box with an embedded label.
+ */
+ private def makeDotNode(node: RDDOperationNode, forJob: Boolean): String = {
+ if (forJob) {
+ s"""${node.id} [label=" " shape="circle" padding="5" labelStyle="font-size: 0"]"""
+ } else {
+ s"""${node.id} [label="${node.name} (${node.id})"]"""
+ }
+ }
+
+ /** Return the dot representation of a subgraph in an RDDOperationGraph. */
+ private def makeDotSubgraph(
+ scope: RDDOperationCluster,
+ forJob: Boolean,
+ indent: String): String = {
+ val subgraph = new StringBuilder
+ subgraph.append(indent + s"subgraph cluster${scope.id} {\n")
+ subgraph.append(indent + s""" label="${scope.name}";\n""")
+ scope.childNodes.foreach { node =>
+ subgraph.append(indent + s" ${makeDotNode(node, forJob)};\n")
+ }
+ scope.childClusters.foreach { cscope =>
+ subgraph.append(makeDotSubgraph(cscope, forJob, indent + " "))
+ }
+ subgraph.append(indent + "}\n")
+ subgraph.toString()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
new file mode 100644
index 0000000..2884a49
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.scope
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler._
+import org.apache.spark.ui.SparkUI
+
+/**
+ * A SparkListener that constructs a DAG of RDD operations.
+ */
+private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {
+ private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
+ private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
+ private val stageIds = new mutable.ArrayBuffer[Int]
+
+ // How many jobs or stages to retain graph metadata for
+ private val retainedStages =
+ conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
+
+ /** Return the graph metadata for the given stage, or None if no such information exists. */
+ def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
+ jobIdToStageIds.get(jobId)
+ .map { sids => sids.flatMap { sid => stageIdToGraph.get(sid) } }
+ .getOrElse { Seq.empty }
+ }
+
+ /** Return the graph metadata for the given stage, or None if no such information exists. */
+ def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = {
+ stageIdToGraph.get(stageId)
+ }
+
+ /** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
+ val jobId = jobStart.jobId
+ val stageInfos = jobStart.stageInfos
+
+ stageInfos.foreach { stageInfo =>
+ stageIds += stageInfo.stageId
+ stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
+ }
+ jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted
+
+ // Remove graph metadata for old stages
+ if (stageIds.size >= retainedStages) {
+ val toRemove = math.max(retainedStages / 10, 1)
+ stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
+ stageIds.trimStart(toRemove)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org